aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/node
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/node')
-rw-r--r--src/bun.js/node/node.classes.ts31
-rw-r--r--src/bun.js/node/node_fs.zig23
-rw-r--r--src/bun.js/node/node_fs_binding.zig2
-rw-r--r--src/bun.js/node/node_fs_stat_watcher.zig503
4 files changed, 553 insertions, 6 deletions
diff --git a/src/bun.js/node/node.classes.ts b/src/bun.js/node/node.classes.ts
index 695a58d5e..46c25ad06 100644
--- a/src/bun.js/node/node.classes.ts
+++ b/src/bun.js/node/node.classes.ts
@@ -31,6 +31,31 @@ export default [
values: ["listener"],
}),
define({
+ name: "StatWatcher",
+ construct: false,
+ noConstructor: true,
+ finalize: true,
+ configurable: false,
+ hasPendingActivity: true,
+ klass: {},
+ JSType: "0b11101110",
+ proto: {
+ ref: {
+ fn: "doRef",
+ length: 0,
+ },
+ unref: {
+ fn: "doUnref",
+ length: 0,
+ },
+ close: {
+ fn: "doClose",
+ length: 0,
+ },
+ },
+ values: ["listener"],
+ }),
+ define({
name: "Timeout",
construct: false,
noConstructor: true,
@@ -471,15 +496,13 @@ export default [
symlinkSync: { fn: "symlinkSync", length: 3 },
truncate: { fn: "truncate", length: 3 },
truncateSync: { fn: "truncateSync", length: 2 },
- // TODO:
- // unwatchFile: { fn: "unwatchFile", length: 2 },
+ unwatchFile: { fn: "unwatchFile", length: 2 },
unlink: { fn: "unlink", length: 2 },
unlinkSync: { fn: "unlinkSync", length: 1 },
utimes: { fn: "utimes", length: 4 },
utimesSync: { fn: "utimesSync", length: 3 },
watch: { fn: "watch", length: 3 },
- // TODO:
- // watchFile: { fn: "watchFile", length: 3 },
+ watchFile: { fn: "watchFile", length: 3 },
writeFile: { fn: "writeFile", length: 4 },
writeFileSync: { fn: "writeFileSync", length: 3 },
write: { fn: "write", length: 6 },
diff --git a/src/bun.js/node/node_fs.zig b/src/bun.js/node/node_fs.zig
index 5407c2d5c..4f9e2bf68 100644
--- a/src/bun.js/node/node_fs.zig
+++ b/src/bun.js/node/node_fs.zig
@@ -3177,7 +3177,7 @@ pub const Arguments = struct {
pub const UnwatchFile = void;
pub const Watch = JSC.Node.FSWatcher.Arguments;
- pub const WatchFile = void;
+ pub const WatchFile = JSC.Node.StatWatcher.Arguments;
pub const Fsync = struct {
fd: FileDescriptor,
@@ -3362,7 +3362,7 @@ const Return = struct {
pub const Unlink = void;
pub const UnwatchFile = void;
pub const Watch = JSC.JSValue;
- pub const WatchFile = void;
+ pub const WatchFile = JSC.JSValue;
pub const Utimes = void;
pub const Chown = void;
@@ -5245,6 +5245,25 @@ pub const NodeFS = struct {
return Maybe(Return.Unlink).todo;
}
+ pub fn watchFile(_: *NodeFS, args: Arguments.WatchFile, comptime flavor: Flavor) Maybe(Return.WatchFile) {
+ std.debug.assert(flavor == .sync);
+
+ if (comptime Environment.isWindows) {
+ args.global_this.throwTODO("watch is not supported on Windows yet");
+ return Maybe(Return.Watch){ .result = JSC.JSValue.undefined };
+ }
+
+ const watcher = args.createStatWatcher() catch |err| {
+ var buf = std.fmt.allocPrint(bun.default_allocator, "{s} watching {}", .{ @errorName(err), strings.QuotedFormatter{ .text = args.path.slice() } }) catch unreachable;
+ defer bun.default_allocator.free(buf);
+ args.global_this.throwValue((JSC.SystemError{
+ .message = bun.String.init(buf),
+ .path = bun.String.init(args.path.slice()),
+ }).toErrorInstance(args.global_this));
+ return Maybe(Return.Watch){ .result = JSC.JSValue.undefined };
+ };
+ return Maybe(Return.Watch){ .result = watcher };
+ }
pub fn unwatchFile(_: *NodeFS, _: Arguments.UnwatchFile, comptime _: Flavor) Maybe(Return.UnwatchFile) {
return Maybe(Return.UnwatchFile).todo;
}
diff --git a/src/bun.js/node/node_fs_binding.zig b/src/bun.js/node/node_fs_binding.zig
index 16128be44..b3866b557 100644
--- a/src/bun.js/node/node_fs_binding.zig
+++ b/src/bun.js/node/node_fs_binding.zig
@@ -299,6 +299,8 @@ pub const NodeJSFS = struct {
}
pub const watch = callSync(.watch);
+ pub const watchFile = callSync(.watchFile);
+ pub const unwatchFile = callSync(.unwatchFile);
// Not implemented yet:
const notimpl = fdatasync;
diff --git a/src/bun.js/node/node_fs_stat_watcher.zig b/src/bun.js/node/node_fs_stat_watcher.zig
new file mode 100644
index 000000000..c2690c200
--- /dev/null
+++ b/src/bun.js/node/node_fs_stat_watcher.zig
@@ -0,0 +1,503 @@
+const std = @import("std");
+const JSC = @import("root").bun.JSC;
+const bun = @import("root").bun;
+const Fs = @import("../../fs.zig");
+const Path = @import("../../resolver/resolve_path.zig");
+const Encoder = JSC.WebCore.Encoder;
+const Mutex = @import("../../lock.zig").Lock;
+const uws = @import("../../deps/uws.zig");
+
+const PathWatcher = @import("./path_watcher.zig");
+
+const VirtualMachine = JSC.VirtualMachine;
+const EventLoop = JSC.EventLoop;
+const PathLike = JSC.Node.PathLike;
+const ArgumentsSlice = JSC.Node.ArgumentsSlice;
+const Output = bun.Output;
+const string = bun.string;
+const StoredFileDescriptorType = bun.StoredFileDescriptorType;
+const Environment = bun.Environment;
+
+const StatsSmall = bun.JSC.Node.StatsSmall;
+const StatsBig = bun.JSC.Node.StatsBig;
+
+const log = bun.Output.scoped(.StatWatcher, false);
+
+fn statToJSStats(globalThis: *JSC.JSGlobalObject, stats: bun.Stat, bigint: bool) JSC.JSValue {
+ if (bigint) {
+ return StatsBig.initWithAllocator(globalThis.allocator(), stats).toJS(globalThis);
+ } else {
+ return StatsSmall.initWithAllocator(globalThis.allocator(), stats).toJS(globalThis);
+ }
+}
+
+/// This is a singleton struct that contains the timer used to schedule restat calls.
+pub const StatWatcherScheduler = struct {
+ timer: ?*uws.Timer = null,
+
+ head: std.atomic.Atomic(?*StatWatcher) = .{ .value = null },
+ is_running: std.atomic.Atomic(bool) = .{ .value = false },
+
+ task: JSC.WorkPoolTask = .{ .callback = &workPoolCallback },
+
+ pub fn init(allocator: std.mem.Allocator, _: *bun.JSC.VirtualMachine) *StatWatcherScheduler {
+ var this = allocator.create(StatWatcherScheduler) catch @panic("out of memory");
+ this.* = .{};
+ return this;
+ }
+
+ pub fn append(this: *StatWatcherScheduler, watcher: *StatWatcher) void {
+ log("append new watcher {s}", .{watcher.path});
+ std.debug.assert(watcher.closed == false);
+ std.debug.assert(watcher.next == null);
+
+ if (this.head.swap(watcher, .Monotonic)) |head| {
+ watcher.next = head;
+ if (!this.is_running.load(.Monotonic)) {
+ this.timer.?.set(this, timerCallback, 1, 0);
+ }
+ } else {
+ if (!this.is_running.load(.Monotonic)) {
+ watcher.last_check = std.time.Instant.now() catch unreachable;
+
+ const vm = watcher.globalThis.bunVM();
+ this.timer = uws.Timer.create(
+ vm.event_loop_handle orelse @panic("UWS Loop was not initialized yet."),
+ this,
+ );
+
+ this.timer.?.set(this, timerCallback, watcher.interval, 0);
+ log("I will wait {d} milli initially", .{watcher.interval});
+ }
+ }
+ }
+
+ pub fn timerCallback(timer: *uws.Timer) callconv(.C) void {
+ var this = timer.ext(StatWatcherScheduler).?;
+ this.is_running.store(true, .Monotonic);
+ JSC.WorkPool.schedule(&this.task);
+ }
+
+ pub fn workPoolCallback(task: *JSC.WorkPoolTask) void {
+ var this: *StatWatcherScheduler = @fieldParentPtr(StatWatcherScheduler, "task", task);
+ // Instant.now will not fail on our target platforms.
+ const now = std.time.Instant.now() catch unreachable;
+
+ var head: *StatWatcher = this.head.swap(null, .Monotonic).?;
+
+ var prev = head;
+ while (prev.closed) {
+ var c = prev;
+ defer {
+ c.used_by_scheduler_thread = false;
+ }
+
+ log("[1] removing closed watcher for '{s}'", .{prev.path});
+ if (prev.next) |next| {
+ prev = next;
+ } else {
+ if (this.head.load(.Monotonic) == null) {
+ this.timer.?.deinit();
+ this.timer = null;
+ // The scheduler is not deinit here, but it will get reused.
+ }
+ return;
+ }
+ }
+
+ if (now.since(prev.last_check) > (@as(u64, @intCast(prev.interval)) * 1_000_000 -| 500)) {
+ prev.last_check = now;
+ prev.restat();
+ }
+ var min_interval = prev.interval;
+
+ var curr: ?*StatWatcher = prev.next;
+ while (curr) |c| : (curr = c.next) {
+ if (c.closed) {
+ log("[2] removing closed watcher for '{s}'", .{c.path});
+ prev.next = c.next;
+ curr = c.next;
+ c.used_by_scheduler_thread = false;
+ continue;
+ }
+ if (now.since(c.last_check) > (@as(u64, @intCast(c.interval)) * 1_000_000 -| 500)) {
+ c.last_check = now;
+ c.restat();
+ }
+ min_interval = @min(min_interval, c.interval);
+ prev = c;
+ curr = c.next;
+ }
+
+ prev.next = this.head.swap(head, .Monotonic);
+
+ log("I will wait {d} milli", .{min_interval});
+
+ this.timer.?.set(this, timerCallback, min_interval, 0);
+ }
+};
+
+pub const StatWatcher = struct {
+ next: ?*StatWatcher = null,
+
+ ctx: *VirtualMachine,
+
+ /// Closed is set to true to tell the scheduler to remove from list and mark `used_by_scheduler_thread` as false.
+ closed: bool,
+ /// When this is marked `false` this StatWatcher can get freed
+ used_by_scheduler_thread: bool,
+
+ path: [:0]u8,
+ persistent: bool,
+ bigint: bool,
+ interval: i32,
+ last_check: std.time.Instant,
+
+ globalThis: *JSC.JSGlobalObject,
+ js_this: JSC.JSValue,
+
+ poll_ref: JSC.PollRef = .{},
+
+ last_stat: bun.Stat,
+ last_jsvalue: JSC.Strong,
+
+ pub usingnamespace JSC.Codegen.JSStatWatcher;
+
+ pub fn eventLoop(this: StatWatcher) *EventLoop {
+ return this.ctx.eventLoop();
+ }
+
+ pub fn enqueueTaskConcurrent(this: StatWatcher, task: *JSC.ConcurrentTask) void {
+ this.eventLoop().enqueueTaskConcurrent(task);
+ }
+
+ pub fn deinit(this: *StatWatcher) void {
+ log("deinit\n", .{});
+ std.debug.assert(!this.hasPendingActivity());
+
+ if (this.persistent) {
+ this.persistent = false;
+ this.poll_ref.unref(this.ctx);
+ }
+ this.closed = true;
+ this.last_jsvalue.clear();
+
+ bun.default_allocator.free(this.path);
+ bun.default_allocator.destroy(this);
+ }
+
+ pub const Arguments = struct {
+ path: PathLike,
+ listener: JSC.JSValue,
+
+ persistent: bool,
+ bigint: bool,
+ interval: i32,
+
+ global_this: JSC.C.JSContextRef,
+
+ pub fn fromJS(ctx: JSC.C.JSContextRef, arguments: *ArgumentsSlice, exception: JSC.C.ExceptionRef) ?Arguments {
+ const vm = ctx.vm();
+ const path = PathLike.fromJS(ctx, arguments, exception) orelse {
+ if (exception.* == null) {
+ JSC.throwInvalidArguments(
+ "filename must be a string or TypedArray",
+ .{},
+ ctx,
+ exception,
+ );
+ }
+ return null;
+ };
+
+ if (exception.* != null) return null;
+
+ var listener: JSC.JSValue = .zero;
+ var persistent: bool = true;
+ var bigint: bool = false;
+ var interval: i32 = 5007;
+
+ if (arguments.nextEat()) |options_or_callable| {
+ // options
+ if (options_or_callable.isObject()) {
+ if (options_or_callable.get(ctx, "persistent")) |persistent_| {
+ if (!persistent_.isBoolean()) {
+ JSC.throwInvalidArguments(
+ "persistent must be a boolean.",
+ .{},
+ ctx,
+ exception,
+ );
+ return null;
+ }
+ persistent = persistent_.toBoolean();
+ }
+
+ if (options_or_callable.get(ctx, "bigint")) |bigint_| {
+ if (!bigint_.isBoolean()) {
+ JSC.throwInvalidArguments(
+ "bigint must be a boolean.",
+ .{},
+ ctx,
+ exception,
+ );
+ return null;
+ }
+ bigint = bigint_.toBoolean();
+ }
+
+ if (options_or_callable.get(ctx, "interval")) |interval_| {
+ if (!interval_.isNumber()) {
+ JSC.throwInvalidArguments(
+ "bigint must be a boolean.",
+ .{},
+ ctx,
+ exception,
+ );
+ return null;
+ }
+ interval = interval_.toInt32(); //*
+ }
+ }
+ }
+
+ if (arguments.nextEat()) |listener_| {
+ if (listener_.isCallable(vm)) {
+ listener = listener_.withAsyncContextIfNeeded(ctx);
+ }
+ }
+
+ if (listener == .zero) {
+ exception.* = JSC.toInvalidArguments("Expected \"listener\" callback", .{}, ctx).asObjectRef();
+ return null;
+ }
+
+ return Arguments{
+ .path = path,
+ .listener = listener,
+ .persistent = persistent,
+ .bigint = bigint,
+ .interval = interval,
+ .global_this = ctx,
+ };
+ }
+
+ pub fn createStatWatcher(this: Arguments) !JSC.JSValue {
+ const obj = try StatWatcher.init(this);
+ if (obj.js_this != .zero) {
+ return obj.js_this;
+ }
+ return JSC.JSValue.jsUndefined();
+ }
+ };
+
+ pub fn doRef(this: *StatWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ if (!this.closed and !this.persistent) {
+ this.persistent = true;
+ this.poll_ref.ref(this.ctx);
+ }
+ return JSC.JSValue.jsUndefined();
+ }
+
+ pub fn doUnref(this: *StatWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ if (this.persistent) {
+ this.persistent = false;
+ this.poll_ref.unref(this.ctx);
+ }
+ return JSC.JSValue.jsUndefined();
+ }
+
+ pub fn hasPendingActivity(this: *StatWatcher) callconv(.C) bool {
+ return this.used_by_scheduler_thread;
+ }
+
+ /// Stops file watching but does not free the instance.
+ pub fn close(
+ this: *StatWatcher,
+ ) void {
+ if (this.persistent) {
+ this.persistent = false;
+ this.poll_ref.unref(this.ctx);
+ }
+ this.closed = true;
+ this.last_jsvalue.clear();
+ }
+
+ pub fn doClose(this: *StatWatcher, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ this.close();
+ return JSC.JSValue.jsUndefined();
+ }
+
+ /// If the scheduler is not using this, free instantly, otherwise mark for being freed.
+ pub fn finalize(this: *StatWatcher) callconv(.C) void {
+ log("Finalize\n", .{});
+ this.deinit();
+ }
+
+ pub const InitialStatTask = struct {
+ watcher: *StatWatcher,
+ task: JSC.WorkPoolTask = .{ .callback = &workPoolCallback },
+
+ pub fn createAndSchedule(
+ watcher: *StatWatcher,
+ ) void {
+ var task = bun.default_allocator.create(InitialStatTask) catch @panic("out of memory");
+ task.* = .{ .watcher = watcher };
+ JSC.WorkPool.schedule(&task.task);
+ }
+
+ fn workPoolCallback(task: *JSC.WorkPoolTask) void {
+ var initial_stat_task: *InitialStatTask = @fieldParentPtr(InitialStatTask, "task", task);
+ defer bun.default_allocator.destroy(initial_stat_task);
+ const this = initial_stat_task.watcher;
+
+ if (this.closed) {
+ this.used_by_scheduler_thread = false;
+ return;
+ }
+
+ const stat = bun.sys.stat(this.path);
+ switch (stat) {
+ .result => |res| {
+ // we store the stat, but do not call the callback
+ this.last_stat = res;
+ this.enqueueTaskConcurrent(JSC.ConcurrentTask.fromCallback(this, initialStatSuccessOnMainThread));
+ },
+ .err => {
+ // on enoent, eperm, we call cb with two zeroed stat objects
+ // and store previous stat as a zeroed stat object, and then call the callback.
+ this.last_stat = std.mem.zeroes(bun.Stat);
+ this.enqueueTaskConcurrent(JSC.ConcurrentTask.fromCallback(this, initialStatErrorOnMainThread));
+ },
+ }
+ }
+ };
+
+ pub fn initialStatSuccessOnMainThread(this: *StatWatcher) void {
+ if (this.closed) {
+ this.used_by_scheduler_thread = false;
+ return;
+ }
+
+ const jsvalue = statToJSStats(this.globalThis, this.last_stat, this.bigint);
+ this.last_jsvalue = JSC.Strong.create(jsvalue, this.globalThis);
+
+ const vm = this.globalThis.bunVM();
+ vm.rareData().nodeFSStatWatcherScheduler(vm).append(this);
+ }
+
+ pub fn initialStatErrorOnMainThread(this: *StatWatcher) void {
+ if (this.closed) {
+ this.used_by_scheduler_thread = false;
+ return;
+ }
+
+ const jsvalue = statToJSStats(this.globalThis, this.last_stat, this.bigint);
+ this.last_jsvalue = JSC.Strong.create(jsvalue, this.globalThis);
+
+ const result = StatWatcher.listenerGetCached(this.js_this).?.call(
+ this.globalThis,
+ &[2]JSC.JSValue{
+ jsvalue,
+ jsvalue,
+ },
+ );
+
+ const vm = this.globalThis.bunVM();
+ if (result.isAnyError()) {
+ vm.onUnhandledError(this.globalThis, result);
+ }
+
+ vm.rareData().nodeFSStatWatcherScheduler(vm).append(this);
+ }
+
+ /// Called from any thread
+ pub fn restat(this: *StatWatcher) void {
+ log("recalling stat", .{});
+ const stat = bun.sys.stat(this.path);
+ const res = switch (stat) {
+ .result => |res| res,
+ .err => std.mem.zeroes(bun.Stat),
+ };
+
+ if (std.mem.eql(u8, std.mem.asBytes(&res), std.mem.asBytes(&this.last_stat))) return;
+
+ this.last_stat = res;
+ this.enqueueTaskConcurrent(JSC.ConcurrentTask.fromCallback(this, swapAndCallListenerOnMainThread));
+ }
+
+ /// After a restat found the file changed, this calls the listener function.
+ pub fn swapAndCallListenerOnMainThread(this: *StatWatcher) void {
+ const prev_jsvalue = this.last_jsvalue.swap();
+ const current_jsvalue = statToJSStats(this.globalThis, this.last_stat, this.bigint);
+ this.last_jsvalue.set(this.globalThis, current_jsvalue);
+
+ const result = StatWatcher.listenerGetCached(this.js_this).?.call(
+ this.globalThis,
+ &[2]JSC.JSValue{
+ current_jsvalue,
+ prev_jsvalue,
+ },
+ );
+ if (result.isAnyError()) {
+ const vm = this.globalThis.bunVM();
+ vm.onUnhandledError(this.globalThis, result);
+ }
+ }
+
+ pub fn onTimerInterval(timer: *uws.Timer) callconv(.C) void {
+ timer.ext(StatWatcher).?.restat();
+ }
+
+ pub fn init(args: Arguments) !*StatWatcher {
+ log("init", .{});
+
+ var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined;
+ var slice = args.path.slice();
+ if (bun.strings.startsWith(slice, "file://")) {
+ slice = slice[6..];
+ }
+ var parts = [_]string{slice};
+ var file_path = Path.joinAbsStringBuf(
+ Fs.FileSystem.instance.top_level_dir,
+ &buf,
+ &parts,
+ .auto,
+ );
+
+ var alloc_file_path = try bun.default_allocator.allocSentinel(u8, file_path.len, 0);
+ errdefer bun.default_allocator.free(alloc_file_path);
+ @memcpy(alloc_file_path, file_path);
+
+ var this = try bun.default_allocator.create(StatWatcher);
+ const vm = args.global_this.bunVM();
+ this.* = .{
+ .ctx = vm,
+ .persistent = args.persistent,
+ .bigint = args.bigint,
+ .interval = @max(5, args.interval),
+ .globalThis = args.global_this,
+ .js_this = .zero,
+ .closed = false,
+ .path = alloc_file_path,
+ .used_by_scheduler_thread = true,
+ // Instant.now will not fail on our target platforms.
+ .last_check = std.time.Instant.now() catch unreachable,
+ // InitStatTask is responsible for setting this
+ .last_stat = undefined,
+ .last_jsvalue = JSC.Strong.init(),
+ };
+ errdefer this.deinit();
+
+ if (this.persistent) {
+ this.poll_ref.ref(this.ctx);
+ }
+
+ const js_this = StatWatcher.toJS(this, this.globalThis);
+ this.js_this = js_this;
+ StatWatcher.listenerSetCached(js_this, this.globalThis, args.listener);
+ InitialStatTask.createAndSchedule(this);
+
+ return this;
+ }
+};