aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bun.js/node/fs_events.zig29
-rw-r--r--src/bun.js/node/node_fs_watcher.zig104
-rw-r--r--src/bun.js/node/path_watcher.zig393
-rw-r--r--src/bun.zig32
-rw-r--r--src/js/node/fs.promises.ts56
-rw-r--r--src/js/out/modules/node/fs.promises.js2
6 files changed, 423 insertions, 193 deletions
diff --git a/src/bun.js/node/fs_events.zig b/src/bun.js/node/fs_events.zig
index cfcd50993..70b41fc33 100644
--- a/src/bun.js/node/fs_events.zig
+++ b/src/bun.js/node/fs_events.zig
@@ -354,25 +354,27 @@ pub const FSEventsLoop = struct {
for (loop.watchers.slice()) |watcher| {
if (watcher) |handle| {
+ const handle_path = handle.path;
+
for (paths, 0..) |path_ptr, i| {
var flags = event_flags[i];
var path = path_ptr[0..bun.len(path_ptr)];
// Filter out paths that are outside handle's request
- if (path.len < handle.path.len or !bun.strings.startsWith(path, handle.path)) {
+ if (path.len < handle_path.len or !bun.strings.startsWith(path, handle_path)) {
continue;
}
const is_file = (flags & kFSEventStreamEventFlagItemIsDir) == 0;
// Remove common prefix, unless the watched folder is "/"
- if (!(handle.path.len == 1 and handle.path[0] == '/')) {
- path = path[handle.path.len..];
+ if (!(handle_path.len == 1 and handle_path[0] == '/')) {
+ path = path[handle_path.len..];
// Ignore events with path equal to directory itself
if (path.len <= 1 and is_file) {
continue;
}
if (path.len == 0) {
- // Since we're using fsevents to watch the file itself, path == handle.path, and we now need to get the basename of the file back
+ // Since we're using fsevents to watch the file itself, path == handle_path, and we now need to get the basename of the file back
while (path.len > 0) {
if (bun.strings.startsWithChar(path, '/')) {
path = path[1..];
@@ -403,7 +405,7 @@ pub const FSEventsLoop = struct {
}
}
- handle.emit(path, is_file, is_rename);
+ handle.emit(path, is_file, if (is_rename) .rename else .change);
}
handle.flush();
}
@@ -554,7 +556,7 @@ pub const FSEventsLoop = struct {
this.signal_source = null;
this.sem.deinit();
- this.mutex.deinit();
+
if (this.watcher_count > 0) {
while (this.watchers.popOrNull()) |watcher| {
if (watcher) |w| {
@@ -578,11 +580,18 @@ pub const FSEventsWatcher = struct {
recursive: bool,
ctx: ?*anyopaque,
- const Callback = *const fn (ctx: ?*anyopaque, path: string, is_file: bool, is_rename: bool) void;
- const UpdateEndCallback = *const fn (ctx: ?*anyopaque) void;
+ pub const EventType = enum {
+ rename,
+ change,
+ @"error",
+ };
+
+ pub const Callback = *const fn (ctx: ?*anyopaque, path: string, is_file: bool, event_type: EventType) void;
+ pub const UpdateEndCallback = *const fn (ctx: ?*anyopaque) void;
pub fn init(loop: *FSEventsLoop, path: string, recursive: bool, callback: Callback, updateEnd: UpdateEndCallback, ctx: ?*anyopaque) *FSEventsWatcher {
var this = bun.default_allocator.create(FSEventsWatcher) catch unreachable;
+
this.* = FSEventsWatcher{
.path = path,
.callback = callback,
@@ -596,8 +605,8 @@ pub const FSEventsWatcher = struct {
return this;
}
- pub fn emit(this: *FSEventsWatcher, path: string, is_file: bool, is_rename: bool) void {
- this.callback(this.ctx, path, is_file, is_rename);
+ pub fn emit(this: *FSEventsWatcher, path: string, is_file: bool, event_type: EventType) void {
+ this.callback(this.ctx, path, is_file, event_type);
}
pub fn flush(this: *FSEventsWatcher) void {
diff --git a/src/bun.js/node/node_fs_watcher.zig b/src/bun.js/node/node_fs_watcher.zig
index 72d9861c0..d6d2f8a2f 100644
--- a/src/bun.js/node/node_fs_watcher.zig
+++ b/src/bun.js/node/node_fs_watcher.zig
@@ -6,7 +6,6 @@ const Path = @import("../../resolver/resolve_path.zig");
const Encoder = JSC.WebCore.Encoder;
const Mutex = @import("../../lock.zig").Lock;
-const FSEvents = @import("./fs_events.zig");
const PathWatcher = @import("./path_watcher.zig");
const VirtualMachine = JSC.VirtualMachine;
@@ -21,15 +20,12 @@ const Environment = bun.Environment;
pub const FSWatcher = struct {
ctx: *VirtualMachine,
verbose: bool = false,
- entry_path: ?string = null,
- entry_dir: string = "",
// JSObject
mutex: Mutex,
signal: ?*JSC.AbortSignal,
persistent: bool,
- default_watcher: ?*PathWatcher.PathWatcher,
- fsevents_watcher: ?*FSEvents.FSEventsWatcher,
+ path_watcher: ?*PathWatcher.PathWatcher,
poll_ref: JSC.PollRef = .{},
globalThis: *JSC.JSGlobalObject,
js_this: JSC.JSValue,
@@ -53,10 +49,6 @@ pub const FSWatcher = struct {
pub fn deinit(this: *FSWatcher) void {
// stop all managers and signals
this.detach();
- if (this.entry_path) |path| {
- this.entry_path = null;
- bun.default_allocator.free(path);
- }
bun.default_allocator.destroy(this);
}
@@ -154,33 +146,7 @@ pub const FSWatcher = struct {
}
};
- pub fn onFSEventUpdate(
- ctx: ?*anyopaque,
- path: string,
- is_file: bool,
- is_rename: bool,
- ) void {
- // only called by FSEventUpdate
-
- const this = bun.cast(*FSWatcher, ctx.?);
-
- const relative_path = bun.default_allocator.dupe(u8, path) catch unreachable;
- const event_type: FSWatchTask.EventType = if (is_rename) .rename else .change;
-
- if (this.verbose) {
- if (is_file) {
- Output.prettyErrorln("<r> <d>File changed: {s}<r>", .{relative_path});
- } else {
- Output.prettyErrorln("<r> <d>Dir changed: {s}<r>", .{relative_path});
- }
- }
-
- this.current_task.append(relative_path, event_type, true);
- }
-
pub fn onPathUpdate(ctx: ?*anyopaque, path: string, is_file: bool, event_type: PathWatcher.PathWatcher.EventType) void {
- // only called by PathWatcher
-
const this = bun.cast(*FSWatcher, ctx.?);
const relative_path = bun.default_allocator.dupe(u8, path) catch unreachable;
@@ -212,7 +178,6 @@ pub const FSWatcher = struct {
Output.flush();
}
// we only enqueue after all events are processed
- // this is called by FSEventsWatcher or PathWatcher
this.current_task.enqueue();
}
@@ -558,14 +523,9 @@ pub const FSWatcher = struct {
signal.detach(this);
}
- if (this.default_watcher) |default_watcher| {
- this.default_watcher = null;
- default_watcher.deinit();
- }
-
- if (this.fsevents_watcher) |fsevents_watcher| {
- this.fsevents_watcher = null;
- fsevents_watcher.deinit();
+ if (this.path_watcher) |path_watcher| {
+ this.path_watcher = null;
+ path_watcher.deinit();
}
if (this.persistent) {
@@ -584,37 +544,6 @@ pub const FSWatcher = struct {
this.deinit();
}
- const PathResult = struct {
- fd: StoredFileDescriptorType = 0,
- is_file: bool = true,
- };
-
- // TODO: switch to using JSC.Maybe to avoid using "unreachable" and improve error messages
- fn fdFromAbsolutePathZ(
- absolute_path_z: [:0]const u8,
- ) !PathResult {
- if (std.fs.openIterableDirAbsoluteZ(absolute_path_z, .{
- .access_sub_paths = true,
- })) |iterable_dir| {
- return PathResult{
- .fd = iterable_dir.dir.fd,
- .is_file = false,
- };
- } else |err| {
- if (err == error.NotDir) {
- var file = try std.fs.openFileAbsoluteZ(absolute_path_z, .{ .mode = .read_only });
- return PathResult{
- .fd = file.handle,
- .is_file = true,
- };
- } else {
- return err;
- }
- }
-
- unreachable;
- }
-
pub fn init(args: Arguments) !*FSWatcher {
var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined;
var slice = args.path.slice();
@@ -635,8 +564,6 @@ pub const FSWatcher = struct {
buf[file_path.len] = 0;
var file_path_z = buf[0..file_path.len :0];
- var fs_type = try fdFromAbsolutePathZ(file_path_z);
-
var ctx = try bun.default_allocator.create(FSWatcher);
const vm = args.global_this.bunVM();
ctx.* = .{
@@ -648,8 +575,7 @@ pub const FSWatcher = struct {
.mutex = Mutex.init(),
.signal = if (args.signal) |s| s.ref() else null,
.persistent = args.persistent,
- .default_watcher = null,
- .fsevents_watcher = null,
+ .path_watcher = null,
.globalThis = args.global_this,
.js_this = .zero,
.encoding = args.encoding,
@@ -661,25 +587,7 @@ pub const FSWatcher = struct {
errdefer ctx.deinit();
- if (comptime Environment.isMac) {
- if (!fs_type.is_file) {
- var dir_path_clone = bun.default_allocator.dupeZ(u8, file_path) catch unreachable;
- ctx.entry_path = dir_path_clone;
- ctx.entry_dir = dir_path_clone;
-
- ctx.fsevents_watcher = try FSEvents.watch(dir_path_clone, args.recursive, onFSEventUpdate, onUpdateEnd, bun.cast(*anyopaque, ctx));
-
- ctx.initJS(args.listener);
- return ctx;
- }
- }
-
- var file_path_clone = bun.default_allocator.dupeZ(u8, file_path) catch unreachable;
-
- ctx.entry_path = file_path_clone;
- ctx.entry_dir = std.fs.path.dirname(file_path_clone) orelse file_path_clone;
- ctx.default_watcher = try PathWatcher.watch(vm, file_path_clone, args.recursive, onPathUpdate, onUpdateEnd, bun.cast(*anyopaque, ctx));
-
+ ctx.path_watcher = try PathWatcher.watch(vm, file_path_z, args.recursive, onPathUpdate, onUpdateEnd, bun.cast(*anyopaque, ctx));
ctx.initJS(args.listener);
return ctx;
}
diff --git a/src/bun.js/node/path_watcher.zig b/src/bun.js/node/path_watcher.zig
index 5a22cb783..49caa93d0 100644
--- a/src/bun.js/node/path_watcher.zig
+++ b/src/bun.js/node/path_watcher.zig
@@ -4,6 +4,7 @@ const UnboundedQueue = @import("../unbounded_queue.zig").UnboundedQueue;
const Path = @import("../../resolver/resolve_path.zig");
const Fs = @import("../../fs.zig");
const Mutex = @import("../../lock.zig").Lock;
+const FSEvents = @import("./fs_events.zig");
const bun = @import("root").bun;
const Output = bun.Output;
@@ -30,9 +31,11 @@ pub const PathWatcherManager = struct {
watcher_count: u32 = 0,
vm: *JSC.VirtualMachine,
file_paths: bun.StringHashMap(PathInfo),
+ current_fd_task: bun.FDHashMap(*DirectoryRegisterTask),
deinit_on_last_watcher: bool = false,
+ pending_tasks: u32 = 0,
+ deinit_on_last_task: bool = false,
mutex: Mutex,
-
const PathInfo = struct {
fd: StoredFileDescriptorType = 0,
is_file: bool = true,
@@ -42,10 +45,14 @@ pub const PathWatcherManager = struct {
hash: Watcher.HashType,
};
+ // TODO: switch to using JSC.Maybe to avoid using "unreachable" and improve error messages
fn _fdFromAbsolutePathZ(
this: *PathWatcherManager,
path: [:0]const u8,
) !PathInfo {
+ this.mutex.lock();
+ defer this.mutex.unlock();
+
if (this.file_paths.getEntry(path)) |entry| {
var info = entry.value_ptr;
info.refs += 1;
@@ -54,42 +61,39 @@ pub const PathWatcherManager = struct {
const cloned_path = try bun.default_allocator.dupeZ(u8, path);
errdefer bun.default_allocator.destroy(cloned_path);
- var stat = try bun.C.lstat_absolute(cloned_path);
- var result = PathInfo{
- .path = cloned_path,
- .dirname = cloned_path,
- .hash = Watcher.getHash(cloned_path),
- .refs = 1,
- };
-
- switch (stat.kind) {
- .sym_link => {
+ if (std.fs.openIterableDirAbsoluteZ(cloned_path, .{
+ .access_sub_paths = true,
+ })) |iterable_dir| {
+ const result = PathInfo{
+ .fd = iterable_dir.dir.fd,
+ .is_file = false,
+ .path = cloned_path,
+ .dirname = cloned_path,
+ .hash = Watcher.getHash(cloned_path),
+ .refs = 1,
+ };
+ _ = try this.file_paths.put(cloned_path, result);
+ return result;
+ } else |err| {
+ if (err == error.NotDir) {
var file = try std.fs.openFileAbsoluteZ(cloned_path, .{ .mode = .read_only });
- result.fd = file.handle;
- const _stat = try file.stat();
-
- result.is_file = _stat.kind != .directory;
- if (result.is_file) {
- result.dirname = std.fs.path.dirname(cloned_path) orelse cloned_path;
- }
- },
- .directory => {
- const dir = (try std.fs.openIterableDirAbsoluteZ(cloned_path, .{
- .access_sub_paths = true,
- })).dir;
- result.fd = dir.fd;
- result.is_file = false;
- },
- else => {
- const file = try std.fs.openFileAbsoluteZ(cloned_path, .{ .mode = .read_only });
- result.fd = file.handle;
- result.is_file = true;
- result.dirname = std.fs.path.dirname(cloned_path) orelse cloned_path;
- },
+ const result = PathInfo{
+ .fd = file.handle,
+ .is_file = true,
+ .path = cloned_path,
+ // if is really a file we need to get the dirname
+ .dirname = std.fs.path.dirname(cloned_path) orelse cloned_path,
+ .hash = Watcher.getHash(cloned_path),
+ .refs = 1,
+ };
+ _ = try this.file_paths.put(cloned_path, result);
+ return result;
+ } else {
+ return err;
+ }
}
- _ = try this.file_paths.put(cloned_path, result);
- return result;
+ unreachable;
}
pub fn init(vm: *JSC.VirtualMachine) !*PathWatcherManager {
@@ -102,6 +106,7 @@ pub const PathWatcherManager = struct {
errdefer watchers.deinitWithAllocator(bun.default_allocator);
var manager = PathWatcherManager{
.file_paths = bun.StringHashMap(PathInfo).init(bun.default_allocator),
+ .current_fd_task = bun.FDHashMap(*DirectoryRegisterTask).init(bun.default_allocator),
.watchers = watchers,
.main_watcher = try Watcher.init(
this,
@@ -142,6 +147,7 @@ pub const PathWatcherManager = struct {
const watchers = this.watchers.slice();
for (events) |event| {
+ if (event.index >= file_paths.len) continue;
const file_path = file_paths[event.index];
const update_count = counts[event.index] + 1;
counts[event.index] = update_count;
@@ -168,6 +174,9 @@ pub const PathWatcherManager = struct {
for (watchers) |w| {
if (w) |watcher| {
+ if (comptime Environment.isMac) {
+ if (watcher.fsevents_watcher != null) continue;
+ }
const entry_point = watcher.path.dirname;
var path = file_path;
@@ -238,6 +247,9 @@ pub const PathWatcherManager = struct {
const event_type: PathWatcher.EventType = .rename; // renaming folders, creating folder or files will be always be rename
for (watchers) |w| {
if (w) |watcher| {
+ if (comptime Environment.isMac) {
+ if (watcher.fsevents_watcher != null) continue;
+ }
const entry_point = watcher.path.dirname;
var path = path_slice;
@@ -297,6 +309,7 @@ pub const PathWatcherManager = struct {
// stop all watchers
for (watchers) |w| {
if (w) |watcher| {
+ log("[watch] error: {s}", .{@errorName(err)});
watcher.emit(@errorName(err), 0, timestamp, false, .@"error");
watcher.flush();
}
@@ -312,47 +325,176 @@ pub const PathWatcherManager = struct {
this.deinit();
}
- fn addDirectory(this: *PathWatcherManager, watcher: *PathWatcher, path: PathInfo, buf: *[bun.MAX_PATH_BYTES + 1]u8) !void {
- const fd = path.fd;
- try this.main_watcher.addDirectory(fd, path.path, path.hash, false);
+ pub const DirectoryRegisterTask = struct {
+ manager: *PathWatcherManager,
+ path: PathInfo,
+ task: JSC.WorkPoolTask = .{ .callback = callback },
+ watcher_list: bun.BabyList(*PathWatcher) = .{},
- var iter = (std.fs.IterableDir{ .dir = std.fs.Dir{
- .fd = fd,
- } }).iterate();
+ pub fn callback(task: *JSC.WorkPoolTask) void {
+ var routine = @fieldParentPtr(@This(), "task", task);
+ defer routine.deinit();
+ routine.run();
+ }
- while (try iter.next()) |entry| {
- var parts = [2]string{ path.path, entry.name };
- var entry_path = Path.joinAbsStringBuf(
- Fs.FileSystem.instance.topLevelDirWithoutTrailingSlash(),
- buf,
- &parts,
- .auto,
- );
+ fn schedule(manager: *PathWatcherManager, watcher: *PathWatcher, path: PathInfo) !void {
+ manager.mutex.lock();
+ defer manager.mutex.unlock();
+ // keep the path alive
+ manager._incrementPathRefNoLock(path.path);
+ errdefer manager._decrementPathRef(path.path);
+
+ // use the same thread for the same fd to avoid race conditions
+ if (manager.current_fd_task.getEntry(path.fd)) |entry| {
+ var routine = entry.value_ptr.*;
+ watcher.mutex.lock();
+ defer watcher.mutex.unlock();
+ watcher.pending_directories += 1;
+ routine.watcher_list.push(bun.default_allocator, watcher) catch |err| {
+ watcher.pending_directories -= 1;
+ return err;
+ };
+ return;
+ }
+ var routine = try bun.default_allocator.create(DirectoryRegisterTask);
+ routine.* = DirectoryRegisterTask{
+ .manager = manager,
+ .path = path,
+ .watcher_list = bun.BabyList(*PathWatcher).initCapacity(bun.default_allocator, 1) catch |err| {
+ bun.default_allocator.destroy(routine);
+ return err;
+ },
+ };
+ errdefer routine.deinit();
+ try routine.watcher_list.push(bun.default_allocator, watcher);
+ watcher.mutex.lock();
+ defer watcher.mutex.unlock();
+ watcher.pending_directories += 1;
+
+ manager.current_fd_task.put(path.fd, routine) catch |err| {
+ watcher.pending_directories -= 1;
+ return err;
+ };
+ manager.pending_tasks += 1;
+ JSC.WorkPool.schedule(&routine.task);
+ return;
+ }
- buf[entry_path.len] = 0;
- var entry_path_z = buf[0..entry_path.len :0];
+ fn getNext(this: *DirectoryRegisterTask) ?*PathWatcher {
+ this.manager.mutex.lock();
+ defer this.manager.mutex.unlock();
- var child_path = try this._fdFromAbsolutePathZ(entry_path_z);
- errdefer this._decrementPathRef(entry_path_z);
- try watcher.file_paths.push(bun.default_allocator, child_path.path);
+ const watcher = this.watcher_list.popOrNull();
+ if (watcher == null) {
+ // no more work todo, release the fd and path
+ _ = this.manager.current_fd_task.remove(this.path.fd);
+ this.manager._decrementPathRefNoLock(this.path.path);
+ return null;
+ }
+ return watcher;
+ }
- if (child_path.is_file) {
- try this.main_watcher.addFile(child_path.fd, child_path.path, child_path.hash, options.Loader.file, 0, null, false);
- } else {
- if (watcher.recursive) {
- try this.addDirectory(watcher, child_path, buf);
+ fn processWatcher(
+ this: *DirectoryRegisterTask,
+ watcher: *PathWatcher,
+ buf: *[bun.MAX_PATH_BYTES + 1]u8,
+ ) !void {
+ const manager = this.manager;
+ const path = this.path;
+ const fd = path.fd;
+ var iter = (std.fs.IterableDir{ .dir = std.fs.Dir{
+ .fd = fd,
+ } }).iterate();
+ defer {
+ watcher.mutex.lock();
+ watcher.pending_directories -= 1;
+
+ if (watcher.pending_directories == 0 and watcher.finalized) {
+ watcher.mutex.unlock();
+ watcher.deinit();
+ } else {
+ watcher.mutex.unlock();
+ }
+ }
+
+ // now we iterate over all files and directories
+ while (try iter.next()) |entry| {
+ var parts = [2]string{ path.path, entry.name };
+ var entry_path = Path.joinAbsStringBuf(
+ Fs.FileSystem.instance.topLevelDirWithoutTrailingSlash(),
+ buf,
+ &parts,
+ .auto,
+ );
+
+ buf[entry_path.len] = 0;
+ var entry_path_z = buf[0..entry_path.len :0];
+
+ var child_path = try manager._fdFromAbsolutePathZ(entry_path_z);
+ watcher.mutex.lock();
+ watcher.file_paths.push(bun.default_allocator, child_path.path) catch |err| {
+ watcher.mutex.unlock();
+ manager._decrementPathRef(entry_path_z);
+ return err;
+ };
+ watcher.mutex.unlock();
+
+ // we need to call this unlocked
+ if (child_path.is_file) {
+ try manager.main_watcher.addFile(child_path.fd, child_path.path, child_path.hash, options.Loader.file, 0, null, false);
+ } else {
+ if (watcher.recursive and !watcher.finalized) {
+ // this may trigger another thread with is desired when available to watch long trees
+ try manager._addDirectory(watcher, child_path);
+ }
}
}
}
+
+ fn run(this: *DirectoryRegisterTask) void {
+ var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined;
+
+ while (this.getNext()) |watcher| {
+ this.processWatcher(watcher, &buf) catch |err| {
+ log("[watch] error registering directory: {s}", .{@errorName(err)});
+ watcher.emit(@errorName(err), 0, std.time.milliTimestamp(), false, .@"error");
+ watcher.flush();
+ };
+ }
+
+ this.manager.mutex.lock();
+ this.manager.pending_tasks -= 1;
+ if (this.manager.deinit_on_last_task and this.manager.pending_tasks == 0) {
+ this.manager.mutex.unlock();
+ this.manager.deinit();
+ } else {
+ this.manager.mutex.unlock();
+ }
+ }
+
+ fn deinit(this: *DirectoryRegisterTask) void {
+ bun.default_allocator.destroy(this);
+ }
+ };
+
+ // this should only be called if thread pool is not null
+ fn _addDirectory(this: *PathWatcherManager, watcher: *PathWatcher, path: PathInfo) !void {
+ const fd = path.fd;
+ try this.main_watcher.addDirectory(fd, path.path, path.hash, false);
+
+ return try DirectoryRegisterTask.schedule(this, watcher, path);
}
fn registerWatcher(this: *PathWatcherManager, watcher: *PathWatcher) !void {
this.mutex.lock();
- defer this.mutex.unlock();
if (this.watcher_count == this.watchers.len) {
this.watcher_count += 1;
- this.watchers.push(bun.default_allocator, watcher) catch unreachable;
+ this.watchers.push(bun.default_allocator, watcher) catch |err| {
+ this.watcher_count -= 1;
+ this.mutex.unlock();
+ return err;
+ };
} else {
var watchers = this.watchers.slice();
for (watchers, 0..) |w, i| {
@@ -363,16 +505,32 @@ pub const PathWatcherManager = struct {
}
}
}
+
+ this.mutex.unlock();
+
const path = watcher.path;
if (path.is_file) {
try this.main_watcher.addFile(path.fd, path.path, path.hash, options.Loader.file, 0, null, false);
} else {
- var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined;
- try this.addDirectory(watcher, path, &buf);
+ if (comptime Environment.isMac) {
+ if (watcher.fsevents_watcher != null) {
+ return;
+ }
+ }
+ try this._addDirectory(watcher, path);
}
}
- fn _decrementPathRef(this: *PathWatcherManager, file_path: [:0]const u8) void {
+ fn _incrementPathRefNoLock(this: *PathWatcherManager, file_path: [:0]const u8) void {
+ if (this.file_paths.getEntry(file_path)) |entry| {
+ var path = entry.value_ptr;
+ if (path.refs > 0) {
+ path.refs += 1;
+ }
+ }
+ }
+
+ fn _decrementPathRefNoLock(this: *PathWatcherManager, file_path: [:0]const u8) void {
if (this.file_paths.getEntry(file_path)) |entry| {
var path = entry.value_ptr;
if (path.refs > 0) {
@@ -387,6 +545,12 @@ pub const PathWatcherManager = struct {
}
}
+ fn _decrementPathRef(this: *PathWatcherManager, file_path: [:0]const u8) void {
+ this.mutex.lock();
+ defer this.mutex.unlock();
+ this._decrementPathRefNoLock(file_path);
+ }
+
fn unregisterWatcher(this: *PathWatcherManager, watcher: *PathWatcher) void {
this.mutex.lock();
defer this.mutex.unlock();
@@ -408,9 +572,18 @@ pub const PathWatcherManager = struct {
}
this.watcher_count -= 1;
+ this._decrementPathRefNoLock(watcher.path.path);
+ if (comptime Environment.isMac) {
+ if (watcher.fsevents_watcher != null) {
+ break;
+ }
+ }
+
+ watcher.mutex.lock();
while (watcher.file_paths.popOrNull()) |file_path| {
- this._decrementPathRef(file_path);
+ this._decrementPathRefNoLock(file_path);
}
+ watcher.mutex.unlock();
break;
}
}
@@ -432,6 +605,12 @@ pub const PathWatcherManager = struct {
return;
}
+ if (this.pending_tasks > 0) {
+ // deinit when all tasks are done
+ this.deinit_on_last_task = true;
+ return;
+ }
+
this.main_watcher.deinit(false);
if (this.watcher_count > 0) {
@@ -454,7 +633,8 @@ pub const PathWatcherManager = struct {
this.file_paths.deinit();
this.watchers.deinitWithAllocator(bun.default_allocator);
- // this.mutex.deinit();
+
+ this.current_fd_task.deinit();
bun.default_allocator.destroy(this);
}
@@ -471,7 +651,13 @@ pub const PathWatcher = struct {
// all watched file paths (including subpaths) except by path it self
file_paths: bun.BabyList([:0]const u8) = .{},
last_change_event: ChangeEvent = .{},
-
+ // on MacOS we use this to watch for changes on directories and subdirectories
+ fsevents_watcher: ?*FSEvents.FSEventsWatcher,
+ mutex: Mutex,
+ pending_directories: u32 = 0,
+ finalized: bool = false,
+ // only used on macOS
+ resolved_path: ?string = null,
pub const ChangeEvent = struct {
hash: PathWatcherManager.Watcher.HashType = 0,
event_type: EventType = .change,
@@ -488,13 +674,58 @@ pub const PathWatcher = struct {
pub fn init(manager: *PathWatcherManager, path: PathWatcherManager.PathInfo, recursive: bool, callback: Callback, updateEndCallback: UpdateEndCallback, ctx: ?*anyopaque) !*PathWatcher {
var this = try bun.default_allocator.create(PathWatcher);
+
+ if (comptime Environment.isMac) {
+ if (!path.is_file) {
+ var buffer: [bun.MAX_PATH_BYTES]u8 = undefined;
+ const resolved_path_temp = std.os.getFdPath(path.fd, &buffer) catch |err| {
+ bun.default_allocator.destroy(this);
+ return err;
+ };
+ const resolved_path = bun.default_allocator.dupeZ(u8, resolved_path_temp) catch |err| {
+ bun.default_allocator.destroy(this);
+ return err;
+ };
+ this.resolved_path = resolved_path;
+ this.* = PathWatcher{
+ .path = path,
+ .callback = callback,
+ .fsevents_watcher = FSEvents.watch(
+ resolved_path,
+ recursive,
+ bun.cast(FSEvents.FSEventsWatcher.Callback, callback),
+ bun.cast(FSEvents.FSEventsWatcher.UpdateEndCallback, updateEndCallback),
+ bun.cast(*anyopaque, ctx),
+ ) catch |err| {
+ bun.default_allocator.destroy(this);
+ return err;
+ },
+ .manager = manager,
+ .recursive = recursive,
+ .flushCallback = updateEndCallback,
+ .file_paths = .{},
+ .ctx = ctx,
+ .mutex = Mutex.init(),
+ };
+
+ errdefer this.deinit();
+
+ // TODO: unify better FSEvents with PathWatcherManager
+ try manager.registerWatcher(this);
+
+ return this;
+ }
+ }
+
this.* = PathWatcher{
+ .fsevents_watcher = null,
.path = path,
.callback = callback,
.manager = manager,
.recursive = recursive,
.flushCallback = updateEndCallback,
.ctx = ctx,
+ .mutex = Mutex.init(),
.file_paths = bun.BabyList([:0]const u8).initCapacity(bun.default_allocator, 1) catch |err| {
bun.default_allocator.destroy(this);
return err;
@@ -525,10 +756,36 @@ pub const PathWatcher = struct {
}
pub fn deinit(this: *PathWatcher) void {
+ this.mutex.lock();
+ this.finalized = true;
+ if (this.pending_directories > 0) {
+ // will be freed on last directory
+ this.mutex.unlock();
+ return;
+ }
+ this.mutex.unlock();
+
if (this.manager) |manager| {
- manager.unregisterWatcher(this);
+ if (comptime Environment.isMac) {
+ if (this.fsevents_watcher) |watcher| {
+ // first unregister on FSEvents
+ watcher.deinit();
+ manager.unregisterWatcher(this);
+ } else {
+ manager.unregisterWatcher(this);
+ this.file_paths.deinitWithAllocator(bun.default_allocator);
+ }
+ } else {
+ manager.unregisterWatcher(this);
+ this.file_paths.deinitWithAllocator(bun.default_allocator);
+ }
+ }
+
+ if (comptime Environment.isMac) {
+ if (this.resolved_path) |path| {
+ bun.default_allocator.free(path);
+ }
}
- this.file_paths.deinitWithAllocator(bun.default_allocator);
bun.default_allocator.destroy(this);
}
diff --git a/src/bun.zig b/src/bun.zig
index 494e49cb4..8dd888b69 100644
--- a/src/bun.zig
+++ b/src/bun.zig
@@ -729,6 +729,34 @@ pub fn getenvZ(path_: [:0]const u8) ?[]const u8 {
return sliceTo(ptr, 0);
}
+//TODO: add windows support
+pub const FDHashMapContext = struct {
+ pub fn hash(_: @This(), fd: FileDescriptor) u64 {
+ return @as(u64, @intCast(fd));
+ }
+ pub fn eql(_: @This(), a: FileDescriptor, b: FileDescriptor) bool {
+ return a == b;
+ }
+ pub fn pre(input: FileDescriptor) Prehashed {
+ return Prehashed{
+ .value = @This().hash(.{}, input),
+ .input = input,
+ };
+ }
+
+ pub const Prehashed = struct {
+ value: u64,
+ input: FileDescriptor,
+ pub fn hash(this: @This(), fd: FileDescriptor) u64 {
+ if (fd == this.input) return this.value;
+ return @as(u64, @intCast(fd));
+ }
+
+ pub fn eql(_: @This(), a: FileDescriptor, b: FileDescriptor) bool {
+ return a == b;
+ }
+ };
+};
// These wrappers exist to use our strings.eqlLong function
pub const StringArrayHashMapContext = struct {
pub fn hash(_: @This(), s: []const u8) u32 {
@@ -831,6 +859,10 @@ pub fn StringHashMapUnmanaged(comptime Type: type) type {
return std.HashMapUnmanaged([]const u8, Type, StringHashMapContext, std.hash_map.default_max_load_percentage);
}
+pub fn FDHashMap(comptime Type: type) type {
+ return std.HashMap(StoredFileDescriptorType, Type, FDHashMapContext, std.hash_map.default_max_load_percentage);
+}
+
const CopyFile = @import("./copy_file.zig");
pub const copyFileRange = CopyFile.copyFileRange;
pub const copyFile = CopyFile.copyFile;
diff --git a/src/js/node/fs.promises.ts b/src/js/node/fs.promises.ts
index 0bf6eb9b2..bdbacd27d 100644
--- a/src/js/node/fs.promises.ts
+++ b/src/js/node/fs.promises.ts
@@ -2,6 +2,7 @@
// Note: `constants` is injected into the top of this file
declare var constants: typeof import("node:fs/promises").constants;
+const { createFIFO } = $lazy("primordials");
var fs = Bun.fs();
@@ -26,7 +27,7 @@ export function watch(
eventType: string;
filename: string | Buffer | undefined;
};
- const events: Array<Event> = [];
+
if (filename instanceof URL) {
throw new TypeError("Watch URLs are not supported yet");
} else if (Buffer.isBuffer(filename)) {
@@ -38,32 +39,55 @@ export function watch(
if (typeof options === "string") {
options = { encoding: options };
}
- fs.watch(filename, options || {}, (eventType: string, filename: string | Buffer | undefined) => {
- events.push({ eventType, filename });
+ const queue = createFIFO();
+
+ const watcher = fs.watch(filename, options || {}, (eventType: string, filename: string | Buffer | undefined) => {
+ queue.push({ eventType, filename });
if (nextEventResolve) {
const resolve = nextEventResolve;
nextEventResolve = null;
resolve();
}
});
+
return {
- async *[Symbol.asyncIterator]() {
+ [Symbol.asyncIterator]() {
let closed = false;
- while (!closed) {
- while (events.length) {
- let event = events.shift() as Event;
- if (event.eventType === "close") {
- closed = true;
- break;
+ return {
+ async next() {
+ while (!closed) {
+ let event: Event;
+ while ((event = queue.shift() as Event)) {
+ if (event.eventType === "close") {
+ closed = true;
+ return { value: undefined, done: true };
+ }
+ if (event.eventType === "error") {
+ closed = true;
+ throw event.filename;
+ }
+ return { value: event, done: false };
+ }
+ const { promise, resolve } = Promise.withResolvers();
+ nextEventResolve = resolve;
+ await promise;
}
- if (event.eventType === "error") {
+ return { value: undefined, done: true };
+ },
+
+ return() {
+ if (!closed) {
+ watcher.close();
closed = true;
- throw event.filename;
+ if (nextEventResolve) {
+ const resolve = nextEventResolve;
+ nextEventResolve = null;
+ resolve();
+ }
}
- yield event;
- }
- await new Promise((resolve: Function) => (nextEventResolve = resolve));
- }
+ return { value: undefined, done: true };
+ },
+ };
},
};
}
diff --git a/src/js/out/modules/node/fs.promises.js b/src/js/out/modules/node/fs.promises.js
index 852836098..df383069b 100644
--- a/src/js/out/modules/node/fs.promises.js
+++ b/src/js/out/modules/node/fs.promises.js
@@ -1 +1 @@
-var o=(S)=>{return import.meta.require(S)};function G(S,U={}){const A=[];if(S instanceof URL)throw new TypeError("Watch URLs are not supported yet");else if(Buffer.isBuffer(S))S=S.toString();else if(typeof S!=="string")throw new TypeError("Expected path to be a string or Buffer");let z=null;if(typeof U==="string")U={encoding:U};return C.watch(S,U||{},(q,g)=>{if(A.push({eventType:q,filename:g}),z){const B=z;z=null,B()}}),{async*[Symbol.asyncIterator](){let q=!1;while(!q){while(A.length){let g=A.shift();if(g.eventType==="close"){q=!0;break}if(g.eventType==="error")throw q=!0,g.filename;yield g}await new Promise((g)=>z=g)}}}}var C=Bun.fs(),D="::bunternal::",J={[D]:(S)=>{return async function(...U){return await 1,S.apply(C,U)}}}[D],H=J(C.accessSync),I=J(C.appendFileSync),K=J(C.closeSync),L=J(C.copyFileSync),M=J(C.existsSync),N=J(C.chownSync),O=J(C.chmodSync),P=J(C.fchmodSync),Q=J(C.fchownSync),V=J(C.fstatSync),X=J(C.fsyncSync),Y=J(C.ftruncateSync),Z=J(C.futimesSync),_=J(C.lchmodSync),$=J(C.lchownSync),T=J(C.linkSync),W=C.lstat.bind(C),j=J(C.mkdirSync),x=J(C.mkdtempSync),E=J(C.openSync),F=J(C.readSync),k=J(C.writeSync),R=C.readdir.bind(C),w=C.readFile.bind(C),h=J(C.writeFileSync),b=J(C.readlinkSync),u=J(C.realpathSync),d=J(C.renameSync),c=C.stat.bind(C),v=J(C.symlinkSync),a=J(C.truncateSync),l=J(C.unlinkSync),y=J(C.utimesSync),p=J(C.lutimesSync),m=J(C.rmSync),n=J(C.rmdirSync),t=(S,U,A)=>{return new Promise((z,q)=>{try{var g=C.writevSync(S,U,A)}catch(B){q(B);return}z({bytesWritten:g,buffers:U})})},r=(S,U,A)=>{return new Promise((z,q)=>{try{var g=C.readvSync(S,U,A)}catch(B){q(B);return}z({bytesRead:g,buffers:U})})},i={access:H,appendFile:I,close:K,copyFile:L,exists:M,chown:N,chmod:O,fchmod:P,fchown:Q,fstat:V,fsync:X,ftruncate:Y,futimes:Z,lchmod:_,lchown:$,link:T,lstat:W,mkdir:j,mkdtemp:x,open:E,read:F,write:k,readdir:R,readFile:w,writeFile:h,readlink:b,realpath:u,rename:d,stat:c,symlink:v,truncate:a,unlink:l,utimes:y,lutimes:p,rm:m,rmdir:n,watch:G,writev:t,readv:r,constants,[Symbol.for("CommonJS")]:0};export{t as writev,h as writeFile,k as write,G as watch,y as utimes,l as unlink,a as truncate,v as symlink,c as stat,n as rmdir,m as rm,d as rename,u as realpath,r as readv,b as readlink,R as readdir,w as readFile,F as read,E as open,x as mkdtemp,j as mkdir,p as lutimes,W as lstat,T as link,$ as lchown,_ as lchmod,Z as futimes,Y as ftruncate,X as fsync,V as fstat,Q as fchown,P as fchmod,M as exists,i as default,L as copyFile,K as close,N as chown,O as chmod,I as appendFile,H as access};
+var s=(z)=>{return import.meta.require(z)};function N(z,B={}){if(z instanceof URL)throw new TypeError("Watch URLs are not supported yet");else if(Buffer.isBuffer(z))z=z.toString();else if(typeof z!=="string")throw new TypeError("Expected path to be a string or Buffer");let C=null;if(typeof B==="string")B={encoding:B};const G=M(),H=S.watch(z,B||{},(D,A)=>{if(G.push({eventType:D,filename:A}),C){const I=C;C=null,I()}});return{[Symbol.asyncIterator](){let D=!1;return{async next(){while(!D){let A;while(A=G.shift()){if(A.eventType==="close")return D=!0,{value:void 0,done:!0};if(A.eventType==="error")throw D=!0,A.filename;return{value:A,done:!1}}const{promise:I,resolve:L}=Promise.withResolvers();C=L,await I}return{value:void 0,done:!0}},return(){if(!D){if(H.close(),D=!0,C){const A=C;C=null,A()}}return{value:void 0,done:!0}}}}}}var{createFIFO:M}=globalThis[Symbol.for("Bun.lazy")]("primordials"),S=Bun.fs(),K="::bunternal::",J={[K]:(z)=>{return async function(...B){return await 1,z.apply(S,B)}}}[K],P=J(S.accessSync),Q=J(S.appendFileSync),U=J(S.closeSync),V=J(S.copyFileSync),X=J(S.existsSync),Y=J(S.chownSync),Z=J(S.chmodSync),_=J(S.fchmodSync),$=J(S.fchownSync),q=J(S.fstatSync),O=J(S.fsyncSync),g=J(S.ftruncateSync),T=J(S.futimesSync),W=J(S.lchmodSync),j=J(S.lchownSync),k=J(S.linkSync),E=S.lstat.bind(S),h=J(S.mkdirSync),w=J(S.mkdtempSync),x=J(S.openSync),F=J(S.readSync),R=J(S.writeSync),b=S.readdir.bind(S),u=S.readFile.bind(S),d=J(S.writeFileSync),c=J(S.readlinkSync),v=J(S.realpathSync),a=J(S.renameSync),y=S.stat.bind(S),l=J(S.symlinkSync),p=J(S.truncateSync),m=J(S.unlinkSync),n=J(S.utimesSync),t=J(S.lutimesSync),r=J(S.rmSync),o=J(S.rmdirSync),f=(z,B,C)=>{return new Promise((G,H)=>{try{var D=S.writevSync(z,B,C)}catch(A){H(A);return}G({bytesWritten:D,buffers:B})})},i=(z,B,C)=>{return new Promise((G,H)=>{try{var D=S.readvSync(z,B,C)}catch(A){H(A);return}G({bytesRead:D,buffers:B})})},SS={access:P,appendFile:Q,close:U,copyFile:V,exists:X,chown:Y,chmod:Z,fchmod:_,fchown:$,fstat:q,fsync:O,ftruncate:g,futimes:T,lchmod:W,lchown:j,link:k,lstat:E,mkdir:h,mkdtemp:w,open:x,read:F,write:R,readdir:b,readFile:u,writeFile:d,readlink:c,realpath:v,rename:a,stat:y,symlink:l,truncate:p,unlink:m,utimes:n,lutimes:t,rm:r,rmdir:o,watch:N,writev:f,readv:i,constants,[Symbol.for("CommonJS")]:0};export{f as writev,d as writeFile,R as write,N as watch,n as utimes,m as unlink,p as truncate,l as symlink,y as stat,o as rmdir,r as rm,a as rename,v as realpath,i as readv,c as readlink,b as readdir,u as readFile,F as read,x as open,w as mkdtemp,h as mkdir,t as lutimes,E as lstat,k as link,j as lchown,W as lchmod,T as futimes,g as ftruncate,O as fsync,q as fstat,$ as fchown,_ as fchmod,X as exists,SS as default,V as copyFile,U as close,Y as chown,Z as chmod,Q as appendFile,P as access};