diff options
author | 2022-11-13 19:14:44 -0800 | |
---|---|---|
committer | 2022-11-13 19:14:44 -0800 | |
commit | b18e4064a2efe2a1963b29d1bdfc33cd48070048 (patch) | |
tree | c2918a7cfb258853a5741d1ec7a64be52bd904ee /src/bun.js/base.zig | |
parent | 58b67347e640be66814a22712ceecdc506ae6e53 (diff) | |
download | bun-b18e4064a2efe2a1963b29d1bdfc33cd48070048.tar.gz bun-b18e4064a2efe2a1963b29d1bdfc33cd48070048.tar.zst bun-b18e4064a2efe2a1963b29d1bdfc33cd48070048.zip |
Make node streams faster (#1502)
* Make node streams faster
* Fix for macOS, improve performance, handle ref and unref
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Diffstat (limited to '')
-rw-r--r-- | src/bun.js/base.zig | 120 |
1 files changed, 78 insertions, 42 deletions
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index 37eb3e626..deeab246a 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -3985,7 +3985,7 @@ pub const FilePoll = struct { flags: Flags.Set = Flags.Set{}, owner: Owner = Deactivated.owner, - const FileBlobLoader = JSC.WebCore.FileBlobLoader; + const FileReader = JSC.WebCore.FileReader; const FileSink = JSC.WebCore.FileSink; const Subprocess = JSC.Subprocess; const BufferedInput = Subprocess.BufferedInput; @@ -3995,7 +3995,7 @@ pub const FilePoll = struct { }; pub const Owner = bun.TaggedPointerUnion(.{ - FileBlobLoader, + FileReader, FileSink, Subprocess, BufferedInput, @@ -4016,12 +4016,12 @@ pub const FilePoll = struct { pub fn onKQueueEvent(poll: *FilePoll, loop: *uws.Loop, kqueue_event: *const std.os.system.kevent64_s) void { poll.updateFlags(Flags.fromKQueueEvent(kqueue_event.*)); - poll.onUpdate(loop); + poll.onUpdate(loop, kqueue_event.data); } pub fn onEpollEvent(poll: *FilePoll, loop: *uws.Loop, epoll_event: *std.os.linux.epoll_event) void { poll.updateFlags(Flags.fromEpollEvent(epoll_event.*)); - poll.onUpdate(loop); + poll.onUpdate(loop, 0); } pub fn clearEvent(poll: *FilePoll, flag: Flags) void { @@ -4072,16 +4072,16 @@ pub const FilePoll = struct { return this.flags.contains(.poll_writable) or this.flags.contains(.poll_readable) or this.flags.contains(.poll_process); } - pub fn onUpdate(poll: *FilePoll, loop: *uws.Loop) void { + pub fn onUpdate(poll: *FilePoll, loop: *uws.Loop, size_or_offset: i64) void { if (poll.flags.contains(.one_shot) and !poll.flags.contains(.needs_rearm)) { if (poll.flags.contains(.has_incremented_poll_count)) poll.deactivate(loop); poll.flags.insert(.needs_rearm); } var ptr = poll.owner; switch (ptr.tag()) { - @field(Owner.Tag, "FileBlobLoader") => { - log("onUpdate: FileBlobLoader", .{}); - ptr.as(FileBlobLoader).onPoll(0, 0); + @field(Owner.Tag, "FileReader") => { + log("onUpdate: FileReader", .{}); + ptr.as(FileReader).onPoll(size_or_offset, 0); }, @field(Owner.Tag, "Subprocess") => { log("onUpdate: Subprocess", .{}); @@ -4092,18 +4092,18 @@ pub const FilePoll = struct { @field(Owner.Tag, "FileSink") => { log("onUpdate: FileSink", .{}); var loader = ptr.as(JSC.WebCore.FileSink); - loader.onPoll(0, 0); + loader.onPoll(size_or_offset, 0); }, @field(Owner.Tag, "BufferedInput") => { log("onUpdate: BufferedInput", .{}); var loader = ptr.as(JSC.Subprocess.BufferedInput); - loader.onReady(0); + loader.onReady(size_or_offset); }, @field(Owner.Tag, "BufferedOutput") => { log("onUpdate: BufferedOutput", .{}); var loader = ptr.as(JSC.Subprocess.BufferedOutput); - loader.ready(0); + loader.ready(size_or_offset); }, else => {}, } @@ -4196,34 +4196,45 @@ pub const FilePoll = struct { const log = Output.scoped(.FilePoll, false); pub inline fn isActive(this: *const FilePoll) bool { - return this.flags.contains(.has_incremented_poll_count) and !this.flags.contains(.disable); + return this.flags.contains(.has_incremented_poll_count); } /// Make calling ref() on this poll into a no-op. - pub fn disable(this: *FilePoll) void { - if (this.isRegistered()) { - this.unregister(JSC.VirtualMachine.vm.uws_event_loop.?); - } - - this.unref(); + pub fn disableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void { + if (this.flags.contains(.disable)) + return; this.flags.insert(.disable); + + vm.uws_event_loop.?.active -= @as(u32, @boolToInt(this.flags.contains(.has_incremented_poll_count))); + } + + pub fn enableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void { + if (!this.flags.contains(.disable)) + return; + this.flags.remove(.disable); + + vm.uws_event_loop.?.active += @as(u32, @boolToInt(this.flags.contains(.has_incremented_poll_count))); + } + + pub fn canActivate(this: *const FilePoll) bool { + return !this.flags.contains(.has_incremented_poll_count); } /// Only intended to be used from EventLoop.Pollable pub fn deactivate(this: *FilePoll, loop: *uws.Loop) void { std.debug.assert(this.flags.contains(.has_incremented_poll_count)); + loop.num_polls -= @as(i32, @boolToInt(this.flags.contains(.has_incremented_poll_count))); + loop.active -= @as(u32, @boolToInt(!this.flags.contains(.disable) and this.flags.contains(.has_incremented_poll_count))); + this.flags.remove(.has_incremented_poll_count); - loop.num_polls -= 1; - loop.active -= 1; } /// Only intended to be used from EventLoop.Pollable pub fn activate(this: *FilePoll, loop: *uws.Loop) void { - std.debug.assert(!this.flags.contains(.has_incremented_poll_count)); - std.debug.assert(!this.flags.contains(.disable)); + loop.num_polls += @as(i32, @boolToInt(!this.flags.contains(.has_incremented_poll_count))); + loop.active += @as(u32, @boolToInt(!this.flags.contains(.disable) and !this.flags.contains(.has_incremented_poll_count))); + this.flags.insert(.has_incremented_poll_count); - loop.num_polls += 1; - loop.active += 1; } pub fn init(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, comptime Type: type, owner: *Type) *FilePoll { @@ -4240,9 +4251,20 @@ pub const FilePoll = struct { return poll; } + pub inline fn canRef(this: *const FilePoll) bool { + if (this.flags.contains(.disable)) + return false; + + return !this.flags.contains(.has_incremented_poll_count); + } + + pub inline fn canUnref(this: *const FilePoll) bool { + return this.flags.contains(.has_incremented_poll_count); + } + /// Prevent a poll from keeping the process alive. pub fn unref(this: *FilePoll, vm: *JSC.VirtualMachine) void { - if (!this.isActive()) + if (!this.canUnref()) return; log("unref", .{}); this.deactivate(vm.uws_event_loop.?); @@ -4250,7 +4272,7 @@ pub const FilePoll = struct { /// Allow a poll to keep the process alive. pub fn ref(this: *FilePoll, vm: *JSC.VirtualMachine) void { - if (this.isActive()) + if (this.canRef()) return; log("ref", .{}); this.activate(vm.uws_event_loop.?); @@ -4296,11 +4318,13 @@ pub const FilePoll = struct { std.debug.assert(this.fd != invalid_fd); if (comptime Environment.isLinux) { + const one_shot_flag: u32 = if (!this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT; + const flags: u32 = switch (flag) { .process, .readable, - => linux.EPOLL.IN | linux.EPOLL.HUP | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT), - .writable => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT), + => linux.EPOLL.IN | linux.EPOLL.HUP | one_shot_flag, + .writable => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | one_shot_flag, else => unreachable, }; @@ -4318,6 +4342,7 @@ pub const FilePoll = struct { } } else if (comptime Environment.isMac) { var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); + const one_shot_flag: @TypeOf(changelist[0].flags) = if (!this.flags.contains(.one_shot)) 0 else std.c.EV_ONESHOT; changelist[0] = switch (flag) { .readable => .{ .ident = @intCast(u64, fd), @@ -4325,7 +4350,7 @@ pub const FilePoll = struct { .data = 0, .fflags = 0, .udata = @ptrToInt(Pollable.init(this).ptr()), - .flags = std.c.EV_ADD | std.c.EV_ONESHOT, + .flags = std.c.EV_ADD | one_shot_flag, .ext = .{ 0, 0 }, }, .writable => .{ @@ -4334,7 +4359,7 @@ pub const FilePoll = struct { .data = 0, .fflags = 0, .udata = @ptrToInt(Pollable.init(this).ptr()), - .flags = std.c.EV_ADD | std.c.EV_ONESHOT, + .flags = std.c.EV_ADD | one_shot_flag, .ext = .{ 0, 0 }, }, .process => .{ @@ -4343,9 +4368,10 @@ pub const FilePoll = struct { .data = 0, .fflags = std.c.NOTE_EXIT, .udata = @ptrToInt(Pollable.init(this).ptr()), - .flags = std.c.EV_ADD, + .flags = std.c.EV_ADD | one_shot_flag, .ext = .{ 0, 0 }, }, + else => unreachable, }; // output events only include change errors @@ -4393,17 +4419,20 @@ pub const FilePoll = struct { } else { @compileError("TODO: Pollable"); } - - if (!this.isActive()) this.activate(loop); + if (this.canActivate()) + this.activate(loop); this.flags.insert(switch (flag) { - .process, .readable => .poll_readable, + .readable => .poll_readable, + .process => if (comptime Environment.isLinux) .poll_readable else .poll_process, .writable => .poll_writable, else => unreachable, }); + this.flags.remove(.needs_rearm); + return JSC.Maybe(void).success; } - pub const invalid_fd = std.math.maxInt(u32); + pub const invalid_fd = JSC.Node.invalid_fd; pub fn unregister(this: *FilePoll, loop: *uws.Loop) JSC.Maybe(void) { if (!(this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process))) { @@ -4424,6 +4453,14 @@ pub const FilePoll = struct { return JSC.Maybe(void).success; }; + if (this.flags.contains(.needs_rearm)) { + log("unregister: {s} ({d}) skipped due to needs_rearm", .{ @tagName(flag), fd }); + this.flags.remove(.poll_process); + this.flags.remove(.poll_readable); + this.flags.remove(.poll_process); + return JSC.Maybe(void).success; + } + log("unregister: {s} ({d})", .{ @tagName(flag), fd }); if (comptime Environment.isLinux) { @@ -4441,22 +4478,22 @@ pub const FilePoll = struct { var changelist = std.mem.zeroes([2]std.os.system.kevent64_s); changelist[0] = switch (flag) { - .read => .{ + .readable => .{ .ident = @intCast(u64, fd), .filter = std.os.system.EVFILT_READ, .data = 0, .fflags = 0, .udata = @ptrToInt(Pollable.init(this).ptr()), - .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .flags = std.c.EV_DELETE, .ext = .{ 0, 0 }, }, - .write => .{ + .writable => .{ .ident = @intCast(u64, fd), .filter = std.os.system.EVFILT_WRITE, .data = 0, .fflags = 0, .udata = @ptrToInt(Pollable.init(this).ptr()), - .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .flags = std.c.EV_DELETE, .ext = .{ 0, 0 }, }, .process => .{ @@ -4465,7 +4502,7 @@ pub const FilePoll = struct { .data = 0, .fflags = std.c.NOTE_EXIT, .udata = @ptrToInt(Pollable.init(this).ptr()), - .flags = std.c.EV_DELETE | std.c.EV_ONESHOT, + .flags = std.c.EV_DELETE, .ext = .{ 0, 0 }, }, else => unreachable, @@ -4498,10 +4535,9 @@ pub const FilePoll = struct { } const errno = std.c.getErrno(rc); - switch (rc) { std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?, - else => unreachable, + else => {}, } } else { @compileError("TODO: Pollable"); |