diff options
author | 2021-06-12 00:49:46 -0700 | |
---|---|---|
committer | 2021-06-12 00:49:46 -0700 | |
commit | f93472101aa7338b3cdfc9db5c936d010f4cda82 (patch) | |
tree | 7c3a0e600037ae1711b3e4de739acdfc1dfdb6c6 | |
parent | 19f4c59b4281bd24130a6898e3b43d9d6d7e6603 (diff) | |
download | bun-f93472101aa7338b3cdfc9db5c936d010f4cda82.tar.gz bun-f93472101aa7338b3cdfc9db5c936d010f4cda82.tar.zst bun-f93472101aa7338b3cdfc9db5c936d010f4cda82.zip |
little kqueue fs watcher
-rw-r--r-- | demos/simple-react/public/index.html | 2 | ||||
-rw-r--r-- | src/api/schema.peechy | 15 | ||||
-rw-r--r-- | src/bundler.zig | 68 | ||||
-rw-r--r-- | src/cache.zig | 15 | ||||
-rw-r--r-- | src/fs.zig | 8 | ||||
-rw-r--r-- | src/global.zig | 4 | ||||
-rw-r--r-- | src/http.zig | 214 | ||||
-rw-r--r-- | src/resolver/package_json.zig | 8 | ||||
-rw-r--r-- | src/resolver/resolver.zig | 1 | ||||
-rw-r--r-- | src/watcher.zig | 232 |
10 files changed, 385 insertions, 182 deletions
diff --git a/demos/simple-react/public/index.html b/demos/simple-react/public/index.html index 724f78f1d..91ced42dd 100644 --- a/demos/simple-react/public/index.html +++ b/demos/simple-react/public/index.html @@ -1,7 +1,7 @@ <!DOCTYPE html> <html> <head> - <script src="/src/index.tsx" type="module"></script> + <script src="./src/index.tsx" type="module"></script> </head> <body> <div id="reactroot"></div> diff --git a/src/api/schema.peechy b/src/api/schema.peechy index 863ed6aa8..257fafd0a 100644 --- a/src/api/schema.peechy +++ b/src/api/schema.peechy @@ -236,16 +236,19 @@ struct Log { // The WebSocket protocol -// Server: "hey, this file changed. Do you want it?" +// Server: "hey, this file changed. Does anyone want it?" // Client: *checks hash table* "uhh yeah, ok. rebuild that for me" // Server: "here u go" // This makes the client responsible for tracking which files it needs to listen for. +// From a server perspective, this means the filesystem watching thread can send the same WebSocket message +// to every client, which is good for performance. It means if you have 5 tabs open it won't really be different than one tab +// The clients can just ignore files they don't care about smol WebsocketMessageType { - visibility_status_change = 1, - build_status_update = 2, - + file_change = 1, + build_success = 2, + build_fail = 3, } -message WebsocketMessageContainer { - +struct WeboscketMessage { + }
\ No newline at end of file diff --git a/src/bundler.zig b/src/bundler.zig index e2905b443..3c1f5155e 100644 --- a/src/bundler.zig +++ b/src/bundler.zig @@ -101,6 +101,7 @@ pub const ParseResult = struct { source: logger.Source, loader: options.Loader, ast: js_ast.Ast, + input_fd: ?StoredFileDescriptorType = null, }; pub const ScanResult = struct { @@ -489,6 +490,7 @@ pub fn NewBundler(cache_files: bool) type { file_path.text, resolve.dirname_fd, true, + null, ); const source = logger.Source.initFile(Fs.File{ .path = file_path, .contents = entry.contents }, bundler.allocator) catch return null; const source_dir = file_path.name.dirWithTrailingSlash(); @@ -743,6 +745,7 @@ pub fn NewBundler(cache_files: bool) type { file_path.text, resolve.dirname_fd, true, + null, ) catch return; const source = logger.Source.initFile(Fs.File{ .path = file_path, .contents = entry.contents }, bundler.allocator) catch return null; @@ -826,6 +829,10 @@ pub fn NewBundler(cache_files: bool) type { } }; + pub const BuildResolveResultPair = struct { + written: usize, + input_fd: ?StoredFileDescriptorType, + }; pub fn buildWithResolveResult( bundler: *ThisBundler, resolve_result: _resolver.Result, @@ -834,9 +841,13 @@ pub fn NewBundler(cache_files: bool) type { comptime Writer: type, writer: Writer, comptime import_path_format: options.BundleOptions.ImportPathFormat, - ) !usize { + file_descriptor: ?StoredFileDescriptorType, + ) !BuildResolveResultPair { if (resolve_result.is_external) { - return 0; + return BuildResolveResultPair{ + .written = 0, + .input_fd = null, + }; } errdefer bundler.resetStore(); @@ -847,20 +858,26 @@ pub fn NewBundler(cache_files: bool) type { var old_bundler_allocator = bundler.allocator; bundler.allocator = allocator; defer bundler.allocator = old_bundler_allocator; - var result = bundler.parse(allocator, file_path, loader, resolve_result.dirname_fd) orelse { + var result = bundler.parse(allocator, file_path, loader, resolve_result.dirname_fd, file_descriptor) orelse { bundler.resetStore(); - return 0; + return BuildResolveResultPair{ + .written = 0, + .input_fd = null, + }; }; var old_linker_allocator = bundler.linker.allocator; defer bundler.linker.allocator = old_linker_allocator; bundler.linker.allocator = allocator; try bundler.linker.link(file_path, &result, import_path_format); - return try bundler.print( - result, - Writer, - writer, - ); + return BuildResolveResultPair{ + .written = try bundler.print( + result, + Writer, + writer, + ), + .input_fd = result.input_fd, + }; // output_file.version = if (resolve_result.is_from_node_modules) resolve_result.package_json_version else null; } @@ -883,7 +900,7 @@ pub fn NewBundler(cache_files: bool) type { switch (loader) { .jsx, .tsx, .js, .ts, .json => { - var result = bundler.parse(bundler.allocator, file_path, loader, resolve_result.dirname_fd) orelse { + var result = bundler.parse(bundler.allocator, file_path, loader, resolve_result.dirname_fd, null) orelse { return null; }; @@ -965,6 +982,7 @@ pub fn NewBundler(cache_files: bool) type { file_path.text, resolve_result.dirname_fd, !cache_files, + null, ) catch return null; const source = logger.Source.initFile(Fs.File{ .path = file_path, .contents = entry.contents }, bundler.allocator) catch return null; @@ -1033,7 +1051,14 @@ pub fn NewBundler(cache_files: bool) type { ); } - pub fn parse(bundler: *ThisBundler, allocator: *std.mem.Allocator, path: Fs.Path, loader: options.Loader, dirname_fd: StoredFileDescriptorType) ?ParseResult { + pub fn parse( + bundler: *ThisBundler, + allocator: *std.mem.Allocator, + path: Fs.Path, + loader: options.Loader, + dirname_fd: StoredFileDescriptorType, + file_descriptor: ?StoredFileDescriptorType, + ) ?ParseResult { if (FeatureFlags.tracing) { bundler.timer.start(); } @@ -1044,7 +1069,13 @@ pub fn NewBundler(cache_files: bool) type { } } var result: ParseResult = undefined; - const entry = bundler.resolver.caches.fs.readFile(bundler.fs, path.text, dirname_fd, !cache_files) catch return null; + const entry = bundler.resolver.caches.fs.readFile( + bundler.fs, + path.text, + dirname_fd, + !cache_files, + file_descriptor, + ) catch return null; const source = logger.Source.initFile(Fs.File{ .path = path, .contents = entry.contents }, bundler.allocator) catch return null; @@ -1065,6 +1096,7 @@ pub fn NewBundler(cache_files: bool) type { .ast = value, .source = source, .loader = loader, + .input_fd = entry.fd, }; }, .json => { @@ -1082,6 +1114,7 @@ pub fn NewBundler(cache_files: bool) type { .ast = js_ast.Ast.initTest(parts), .source = source, .loader = loader, + .input_fd = entry.fd, }; }, .css => { @@ -1093,17 +1126,6 @@ pub fn NewBundler(cache_files: bool) type { return null; } - pub fn buildServeResultOutput(bundler: *ThisBundler, resolve: _resolver.Result, loader: options.Loader) !ServeResult.Output { - switch (loader) { - .js, .jsx, .ts, .tsx, .json => { - return ServeResult.Output{ .built = bundler.buildWithResolveResult(resolve) orelse error.BuildFailed }; - }, - else => { - return ServeResult.Output{ .file = ServeResult.Output.File{ .absolute_path = resolve.path_pair.primary.text } }; - }, - } - } - threadlocal var tmp_buildfile_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; // We try to be mostly stateless when serving diff --git a/src/cache.zig b/src/cache.zig index c28312fbf..07b4b152b 100644 --- a/src/cache.zig +++ b/src/cache.zig @@ -69,6 +69,7 @@ pub fn NewCache(comptime cache_files: bool) type { path: string, dirname_fd: StoredFileDescriptorType, comptime use_shared_buffer: bool, + _file_handle: ?StoredFileDescriptorType, ) !Entry { var rfs = _fs.fs; @@ -82,16 +83,18 @@ pub fn NewCache(comptime cache_files: bool) type { } } - var file_handle: std.fs.File = undefined; + var file_handle: std.fs.File = if (_file_handle) |__file| std.fs.File{ .handle = __file } else undefined; - if (FeatureFlags.store_file_descriptors and dirname_fd > 0) { - file_handle = try std.fs.Dir.openFile(std.fs.Dir{ .fd = dirname_fd }, std.fs.path.basename(path), .{ .read = true }); - } else { - file_handle = try std.fs.openFileAbsolute(path, .{ .read = true }); + if (_file_handle == null) { + if (FeatureFlags.store_file_descriptors and dirname_fd > 0) { + file_handle = try std.fs.Dir.openFile(std.fs.Dir{ .fd = dirname_fd }, std.fs.path.basename(path), .{ .read = true }); + } else { + file_handle = try std.fs.openFileAbsolute(path, .{ .read = true }); + } } defer { - if (rfs.needToCloseFiles()) { + if (rfs.needToCloseFiles() and _file_handle == null) { file_handle.close(); } } diff --git a/src/fs.zig b/src/fs.zig index d3957ebe3..043a7bd19 100644 --- a/src/fs.zig +++ b/src/fs.zig @@ -759,17 +759,21 @@ pub const FileSystem = struct { shared_buffer.reset(); try shared_buffer.growBy(size); shared_buffer.list.expandToCapacity(); - var read_count = file.readAll(shared_buffer.list.items) catch |err| { + // We use pread to ensure if the file handle was open, it doesn't seek from the last position + var read_count = file.preadAll(shared_buffer.list.items, 0) catch |err| { fs.readFileError(path, err); return err; }; shared_buffer.list.items = shared_buffer.list.items[0..read_count]; file_contents = shared_buffer.list.items; } else { - file_contents = file.readToEndAllocOptions(fs.allocator, size, size, @alignOf(u8), null) catch |err| { + // We use pread to ensure if the file handle was open, it doesn't seek from the last position + var buf = try fs.allocator.alloc(u8, size); + var read_count = file.preadAll(buf, 0) catch |err| { fs.readFileError(path, err); return err; }; + file_contents = buf[0..read_count]; } if (fs.watcher) |*watcher| { diff --git a/src/global.zig b/src/global.zig index 6610abafd..97727458f 100644 --- a/src/global.zig +++ b/src/global.zig @@ -48,6 +48,8 @@ pub const FeatureFlags = struct { pub const tracing = true; + pub const verbose_watcher = true; + pub const CSSModulePolyfill = enum { // When you import a .css file and you reference the import in JavaScript // Just return whatever the property key they referenced was @@ -59,7 +61,7 @@ pub const isDebug = std.builtin.Mode.Debug == std.builtin.mode; pub const isTest = std.builtin.is_test; pub const Output = struct { - var source: *Source = undefined; + threadlocal var source: *Source = undefined; pub const Source = struct { const StreamType = { if (isWasm) { diff --git a/src/http.zig b/src/http.zig index 7ec448f20..3eba74bfa 100644 --- a/src/http.zig +++ b/src/http.zig @@ -23,10 +23,12 @@ const Bundler = bundler.ServeBundler; const Websocket = @import("./http/websocket.zig"); const js_printer = @import("js_printer.zig"); const SOCKET_FLAGS = os.SOCK_CLOEXEC; - +const watcher = @import("./watcher.zig"); threadlocal var req_headers_buf: [100]picohttp.Header = undefined; threadlocal var res_headers_buf: [100]picohttp.Header = undefined; +const Watcher = watcher.NewWatcher(*Server); + const ENABLE_LOGGER = false; pub fn println(comptime fmt: string, args: anytype) void { // if (ENABLE_LOGGER) { @@ -169,6 +171,7 @@ pub const RequestContext = struct { has_called_done: bool = false, mime_type: MimeType = MimeType.other, controlled: bool = false, + watcher: *Watcher, res_headers_count: usize = 0, @@ -269,7 +272,13 @@ pub const RequestContext = struct { ctx.status = code; } - pub fn init(req: Request, arena: std.heap.ArenaAllocator, conn: *tcp.Connection, bundler_: *Bundler) !RequestContext { + pub fn init( + req: Request, + arena: std.heap.ArenaAllocator, + conn: *tcp.Connection, + bundler_: *Bundler, + watcher_: *Watcher, + ) !RequestContext { var ctx = RequestContext{ .request = req, .arena = arena, @@ -279,6 +288,7 @@ pub const RequestContext = struct { .conn = conn, .allocator = undefined, .method = Method.which(req.method) orelse return error.InvalidMethod, + .watcher = watcher_, }; return ctx; @@ -382,6 +392,16 @@ pub const RequestContext = struct { pub fn handle(self: WebsocketHandler) void { var this = self; + var stdout = std.io.getStdOut(); + // var stdout = std.io.bufferedWriter(stdout_file.writer()); + var stderr = std.io.getStdErr(); + // var stderr = std.io.bufferedWriter(stderr_file.writer()); + var output_source = Output.Source.init(stdout, stderr); + // defer stdout.flush() catch {}; + // defer stderr.flush() catch {}; + Output.Source.set(&output_source); + Output.enable_ansi_colors = stderr.isTty(); + _handle(&this) catch {}; } @@ -449,7 +469,6 @@ pub const RequestContext = struct { return; }, .Text => { - Output.print("Data: {s}", .{frame.data}); _ = try websocket.writeText(frame.data); }, .Ping => { @@ -538,16 +557,25 @@ pub const RequestContext = struct { ctx.url.extname, ); - ctx.mime_type = result.mime_type; - ctx.appendHeader("Content-Type", result.mime_type.value); if (ctx.keep_alive) { ctx.appendHeader("Connection", "keep-alive"); } + if (std.meta.activeTag(result.file.value) == .noop) { + return try ctx.sendNotFound(); + } + + ctx.mime_type = result.mime_type; + ctx.appendHeader("Content-Type", result.mime_type.value); + const send_body = ctx.method == .GET; switch (result.file.value) { .pending => |resolve_result| { + const hash = Watcher.getHash(result.file.input.text); + var watcher_index = ctx.watcher.indexOf(hash); + var input_fd = if (watcher_index) |ind| ctx.watcher.watchlist.items(.fd)[ind] else null; + if (resolve_result.is_external) { try ctx.sendBadRequest(); return; @@ -638,141 +666,6 @@ pub const RequestContext = struct { SocketPrinterInternal.getLastLastByte, ); - // const ChunkedTransferEncoding = struct { - // rctx: *RequestContext, - // has_disconnected: bool = false, - // chunk_written: usize = 0, - // pushed_chunks_count: usize = 0, - // disabled: bool = false, - - // threadlocal var chunk_buf: [8096]u8 = undefined; - // threadlocal var chunk_header_buf: [32]u8 = undefined; - // threadlocal var chunk_footer_buf: [2]u8 = undefined; - - // pub fn create(rctx: *RequestContext) @This() { - // return @This(){ - // .rctx = rctx, - // }; - // } - - // pub fn writeByte(chunky: *@This(), byte: u8) anyerror!usize { - // return try chunky.writeAll(&[_]u8{byte}); - // } - // pub fn writeAll(chunky: *@This(), bytes: anytype) anyerror!usize { - // // This lets us check if disabled without an extra branch - // const dest_chunk_written = (bytes.len + chunky.chunk_written) * @intCast(usize, @boolToInt(!chunky.disabled)); - // switch (dest_chunk_written) { - // 0 => { - // return 0; - // }, - // // Fast path - // 1...chunk_buf.len => { - // std.mem.copy(u8, chunk_buf[chunky.chunk_written..dest_chunk_written], bytes); - // chunky.chunk_written = dest_chunk_written; - // return bytes.len; - // }, - // // Slow path - // else => { - // var byte_slice: []const u8 = bytes[0..bytes.len]; - // while (byte_slice.len > 0) { - // var remainder_slice = chunk_buf[chunky.chunk_written..]; - // const copied_size = std.math.min(remainder_slice.len, byte_slice.len); - - // std.mem.copy(u8, remainder_slice, byte_slice[0..copied_size]); - // byte_slice = byte_slice[copied_size..]; - - // chunky.chunk_written += copied_size; - - // if (chunky.chunk_written >= chunk_buf.len) { - // chunky.flush() catch |err| { - // return err; - // }; - // } - // } - // return bytes.len; - // }, - // } - // } - - // pub fn flush(chunky: *@This()) anyerror!void { - // if (!chunky.rctx.has_written_last_header) { - // try chunky.rctx.writeStatus(200); - // try chunky.rctx.prepareToSendBody(0, true); - // } - - // // how much are we pushing? - // // remember, this won't always be a full chunk size - // const content_length = chunky.chunk_written; - // // it could be zero if it's the final chunk - // var content_length_buf_size = std.fmt.formatIntBuf(&chunk_header_buf, content_length, 16, true, .{}); - // var after_content_length = chunk_header_buf[content_length_buf_size..]; - // after_content_length[0] = '\r'; - // after_content_length[1] = '\n'; - - // var written = try chunky.rctx.conn.client.write(chunk_header_buf[0 .. content_length_buf_size + 2], SOCKET_FLAGS); - // if (written == 0) { - // chunky.disabled = true; - // return error.SocketClosed; - // } - // written = try chunky.rctx.conn.client.write(chunk_buf[0..chunky.chunk_written], SOCKET_FLAGS); - // chunky.chunk_written = chunky.chunk_written - written; - - // chunky.pushed_chunks_count += 1; - // } - - // pub fn done(chunky: *@This()) anyerror!void { - // if (chunky.disabled) { - // return; - // } - - // defer chunky.rctx.done(); - - // // Actually, its just one chunk so we'll send it all at once - // // instead of using transfer encoding - // if (chunky.pushed_chunks_count == 0 and !chunky.rctx.has_written_last_header) { - - // // turns out it's empty! - // if (chunky.chunk_written == 0) { - // try chunky.rctx.sendNoContent(); - - // return; - // } - - // const buffer = chunk_buf[0..chunky.chunk_written]; - - // if (FeatureFlags.strong_etags_for_built_files) { - // const strong_etag = std.hash.Wyhash.hash(1, buffer); - // const etag_content_slice = std.fmt.bufPrintIntToSlice(strong_etag_buffer[0..49], strong_etag, 16, true, .{}); - - // chunky.rctx.appendHeader("ETag", etag_content_slice); - - // if (chunky.rctx.header("If-None-Match")) |etag_header| { - // if (std.mem.eql(u8, etag_content_slice, etag_header.value)) { - // try chunky.rctx.sendNotModified(); - // return; - // } - // } - // } - - // try chunky.rctx.writeStatus(200); - // try chunky.rctx.prepareToSendBody(chunky.chunk_written, false); - // try chunky.rctx.writeBodyBuf(buffer); - // return; - // } - - // if (chunky.chunk_written > 0) { - // try chunky.flush(); - // } - - // _ = try chunky.rctx.writeSocket("0\r\n\r\n", SOCKET_FLAGS); - // } - - // pub const Writer = js_printer.NewWriter(@This(), writeByte, writeAll); - // pub fn writer(chunky: *@This()) Writer { - // return Writer.init(chunky.*); - // } - // }; - var chunked_encoder = SocketPrinter.init(SocketPrinterInternal.init(ctx)); // It will call flush for us automatically @@ -785,13 +678,29 @@ pub const RequestContext = struct { SocketPrinter, chunked_encoder, .absolute_url, + input_fd, ); + if (written.input_fd) |written_fd| { + try ctx.watcher.addFile(written_fd, result.file.input.text, hash, true); + if (ctx.watcher.watchloop_handle == null) { + try ctx.watcher.start(); + } + } }, .noop => { try ctx.sendNotFound(); }, .copy, .move => |file| { - defer std.os.close(file.fd); + // defer std.os.close(file.fd); + defer { + if (ctx.watcher.addFile(file.fd, result.file.input.text, Watcher.getHash(result.file.input.text), true)) { + if (ctx.watcher.watchloop_handle == null) { + ctx.watcher.start() catch |err| { + Output.prettyErrorln("Failed to start watcher: {s}", .{@errorName(err)}); + }; + } + } else |err| {} + } // if (result.mime_type.category != .html) { // hash(absolute_file_path, size, mtime) @@ -916,6 +825,7 @@ pub const Server = struct { log: logger.Log, allocator: *std.mem.Allocator, bundler: Bundler, + watcher: *Watcher, pub fn adjustUlimit() !void { var limit = try std.os.getrlimit(.NOFILE); @@ -935,6 +845,13 @@ pub const Server = struct { server.handleConnection(&conn); } + pub fn onFileUpdate(ctx: *Server, events: []watcher.WatchEvent, watchlist: watcher.Watchlist) void { + for (events) |event| { + const item = watchlist.items(.file_path)[event.index]; + Output.prettyln("File changed: \"<b>{s}<r>\"", .{item}); + } + } + fn run(server: *Server) !void { adjustUlimit() catch {}; const listener = try tcp.Listener.init(.ip, .{ .close_on_exec = true }); @@ -1004,7 +921,13 @@ pub const Server = struct { req_ctx.arena.deinit(); } } - req_ctx = RequestContext.init(req, request_arena, conn, &server.bundler) catch |err| { + req_ctx = RequestContext.init( + req, + request_arena, + conn, + &server.bundler, + server.watcher, + ) catch |err| { Output.printErrorln("<r>[<red>{s}<r>] - <b>{s}<r>: {s}", .{ @errorName(err), req.method, req.path }); conn.client.deinit(); return; @@ -1050,16 +973,23 @@ pub const Server = struct { } } + pub fn initWatcher(server: *Server) !void { + server.watcher = try Watcher.init(server, server.bundler.fs, server.allocator); + } + pub fn start(allocator: *std.mem.Allocator, options: Api.TransformOptions) !void { var log = logger.Log.init(allocator); var server = Server{ .allocator = allocator, .log = log, .bundler = undefined, + .watcher = undefined, }; server.bundler = try Bundler.init(allocator, &server.log, options); server.bundler.configureLinker(); + try server.initWatcher(); + try server.run(); } }; diff --git a/src/resolver/package_json.zig b/src/resolver/package_json.zig index e8c8a98b5..a7b156dea 100644 --- a/src/resolver/package_json.zig +++ b/src/resolver/package_json.zig @@ -59,7 +59,13 @@ pub const PackageJSON = struct { const package_json_path_ = r.fs.abs(&parts); const package_json_path = r.fs.filename_store.append(package_json_path_) catch unreachable; - const entry = r.caches.fs.readFile(r.fs, package_json_path, dirname_fd, false) catch |err| { + const entry = r.caches.fs.readFile( + r.fs, + package_json_path, + dirname_fd, + false, + null, + ) catch |err| { if (err != error.IsDir) { r.log.addErrorFmt(null, logger.Loc.Empty, r.allocator, "Cannot read file \"{s}\": {s}", .{ r.prettyPath(fs.Path.init(input_path)), @errorName(err) }) catch unreachable; } diff --git a/src/resolver/resolver.zig b/src/resolver/resolver.zig index 6819fe3f0..41ea7cd9c 100644 --- a/src/resolver/resolver.zig +++ b/src/resolver/resolver.zig @@ -856,6 +856,7 @@ pub fn NewResolver(cache_files: bool) type { file, dirname_fd, false, + null, ); const key_path = Path.init(file); diff --git a/src/watcher.zig b/src/watcher.zig new file mode 100644 index 000000000..406703602 --- /dev/null +++ b/src/watcher.zig @@ -0,0 +1,232 @@ +const Fs = @import("./fs.zig"); +const std = @import("std"); +usingnamespace @import("global.zig"); +const sync = @import("sync.zig"); + +const os = std.os; +const KEvent = std.os.Kevent; + +pub const WatchItem = struct { + file_path: string, + // filepath hash for quick comparison + hash: u32, + eventlist_index: u32, + fd: StoredFileDescriptorType, +}; + +pub const WatchEvent = struct { + index: u32, + op: Op, + + pub fn fromKEvent(this: *WatchEvent, kevent: *const KEvent) void { + this.op.delete = (kevent.fflags & std.os.NOTE_DELETE) > 0; + this.op.metadata = (kevent.fflags & std.os.NOTE_ATTRIB) > 0; + this.op.rename = (kevent.fflags & std.os.NOTE_RENAME) > 0; + this.op.write = (kevent.fflags & std.os.NOTE_WRITE) > 0; + this.index = @truncate(u32, kevent.udata); + } + + pub const Op = packed struct { + delete: bool = false, + metadata: bool = false, + rename: bool = false, + write: bool = false, + }; +}; + +pub const Watchlist = std.MultiArrayList(WatchItem); + +// This implementation only works on macOS, for now. +// The Internet seems to suggest basically always using FSEvents instead of kqueue +// It seems like the main concern is max open file descriptors +// Since we adjust the ulimit already, I think we can avoid that. +pub fn NewWatcher(comptime ContextType: type) type { + return struct { + const Watcher = @This(); + + const KEventArrayList = std.ArrayList(KEvent); + + watchlist: Watchlist, + watched_count: usize = 0, + mutex: sync.Mutex, + + // Internal + changelist: [128]KEvent = undefined, + changed_count: u8 = 0, + + // User-facing + watch_events: [128]WatchEvent = undefined, + + // Everything being watched + eventlist: [8096]KEvent = undefined, + eventlist_used: usize = 0, + + fs: *Fs.FileSystem, + // this is what kqueue knows about + fd: StoredFileDescriptorType, + ctx: ContextType, + allocator: *std.mem.Allocator, + watchloop_handle: ?u64 = null, + + pub fn init(ctx: ContextType, fs: *Fs.FileSystem, allocator: *std.mem.Allocator) !*Watcher { + var watcher = try allocator.create(Watcher); + watcher.* = Watcher{ + .fs = fs, + .fd = 0, + .allocator = allocator, + .watched_count = 0, + .ctx = ctx, + .watchlist = Watchlist{}, + .mutex = sync.Mutex.init(), + }; + return watcher; + } + + pub fn getQueue(this: *Watcher) !StoredFileDescriptorType { + if (this.fd == 0) { + this.fd = try os.kqueue(); + if (this.fd == 0) { + return error.WatcherFailed; + } + } + + return this.fd; + } + + pub fn start(this: *Watcher) !void { + _ = try this.getQueue(); + std.debug.assert(this.watchloop_handle == null); + + _ = try std.Thread.spawn(Watcher.watchLoop, this); + } + + // This must only be called from the watcher thread + pub fn watchLoop(this: *Watcher) !void { + this.watchloop_handle = std.Thread.getCurrentThreadId(); + var stdout = std.io.getStdOut(); + // var stdout = std.io.bufferedWriter(stdout_file.writer()); + var stderr = std.io.getStdErr(); + // var stderr = std.io.bufferedWriter(stderr_file.writer()); + var output_source = Output.Source.init(stdout, stderr); + // defer stdout.flush() catch {}; + // defer stderr.flush() catch {}; + Output.Source.set(&output_source); + Output.enable_ansi_colors = stderr.isTty(); + + defer Output.flush(); + if (FeatureFlags.verbose_watcher) Output.prettyln("Watcher started", .{}); + + this._watchLoop() catch |err| { + Output.prettyErrorln("<r>Watcher crashed: <red><b>{s}<r>", .{@errorName(err)}); + + this.watchloop_handle = null; + std.os.close(this.fd); + this.fd = 0; + return; + }; + } + + fn _watchLoop(this: *Watcher) !void { + const time = std.time; + + // poll at 1 second intervals if it hasn't received any events. + // var timeout_spec = null; + std.debug.assert(this.fd > 0); + + var changelist_array: [1]KEvent = std.mem.zeroes([1]KEvent); + var changelist = &changelist_array; + while (true) { + defer Output.flush(); + var code = std.os.system.kevent( + try this.getQueue(), + @as([*]KEvent, changelist), + 0, + @as([*]KEvent, changelist), + 1, + + null, + ); + + var watchevents = this.watch_events[0..1]; + for (changelist) |event, i| { + watchevents[i].fromKEvent(&event); + } + + this.ctx.onFileUpdate(watchevents, this.watchlist); + } + } + pub fn getHash(filepath: string) u32 { + return @truncate(u32, std.hash.Wyhash.hash(0, filepath)); + } + pub fn indexOf(this: *Watcher, hash: u32) ?usize { + for (this.watchlist.items(.hash)) |other, i| { + if (hash == other) { + return i; + } + } + return null; + } + + pub fn addFile( + this: *Watcher, + fd: StoredFileDescriptorType, + file_path: string, + hash: u32, + comptime copy_file_path: bool, + ) !void { + if (this.indexOf(hash) != null) { + return; + } + + try this.watchlist.ensureUnusedCapacity(this.allocator, 1); + + // https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man2/kqueue.2.html + var event = std.mem.zeroes(KEvent); + + event.flags = os.EV_ADD | os.EV_CLEAR | os.EV_ENABLE; + // we want to know about the vnode + event.filter = std.os.EVFILT_VNODE; + + // monitor: + // - Delete + // - Write + // - Metadata + // - Rename + event.fflags = std.os.NOTE_WRITE | std.os.NOTE_RENAME; + + // id + event.ident = @intCast(usize, fd); + + const index = this.eventlist_used; + this.eventlist_used += 1; + const watchlist_id = this.watchlist.len; + // Store the hash for fast filtering later + event.udata = @intCast(usize, watchlist_id); + this.eventlist[index] = event; + + // This took a lot of work to figure out the right permutation + // Basically: + // - We register the event here. + // our while(true) loop above receives notification of changes to any of the events created here. + _ = std.os.system.kevent( + try this.getQueue(), + this.eventlist[index .. index + 1].ptr, + 1, + this.eventlist[index .. index + 1].ptr, + 0, + null, + ); + + this.watchlist.appendAssumeCapacity(.{ + .file_path = if (copy_file_path) try this.allocator.dupe(u8, file_path) else file_path, + .fd = fd, + .hash = hash, + .eventlist_index = @truncate(u32, index), + }); + + if (FeatureFlags.verbose_watcher) { + Output.prettyln("<r>Added <b>{s}<r> to watch list.", .{file_path}); + } + } + }; +} |