aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/base.zig
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-11-13 19:14:44 -0800
committerGravatar GitHub <noreply@github.com> 2022-11-13 19:14:44 -0800
commitb18e4064a2efe2a1963b29d1bdfc33cd48070048 (patch)
treec2918a7cfb258853a5741d1ec7a64be52bd904ee /src/bun.js/base.zig
parent58b67347e640be66814a22712ceecdc506ae6e53 (diff)
downloadbun-b18e4064a2efe2a1963b29d1bdfc33cd48070048.tar.gz
bun-b18e4064a2efe2a1963b29d1bdfc33cd48070048.tar.zst
bun-b18e4064a2efe2a1963b29d1bdfc33cd48070048.zip
Make node streams faster (#1502)
* Make node streams faster * Fix for macOS, improve performance, handle ref and unref Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Diffstat (limited to '')
-rw-r--r--src/bun.js/base.zig120
1 files changed, 78 insertions, 42 deletions
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig
index 37eb3e626..deeab246a 100644
--- a/src/bun.js/base.zig
+++ b/src/bun.js/base.zig
@@ -3985,7 +3985,7 @@ pub const FilePoll = struct {
flags: Flags.Set = Flags.Set{},
owner: Owner = Deactivated.owner,
- const FileBlobLoader = JSC.WebCore.FileBlobLoader;
+ const FileReader = JSC.WebCore.FileReader;
const FileSink = JSC.WebCore.FileSink;
const Subprocess = JSC.Subprocess;
const BufferedInput = Subprocess.BufferedInput;
@@ -3995,7 +3995,7 @@ pub const FilePoll = struct {
};
pub const Owner = bun.TaggedPointerUnion(.{
- FileBlobLoader,
+ FileReader,
FileSink,
Subprocess,
BufferedInput,
@@ -4016,12 +4016,12 @@ pub const FilePoll = struct {
pub fn onKQueueEvent(poll: *FilePoll, loop: *uws.Loop, kqueue_event: *const std.os.system.kevent64_s) void {
poll.updateFlags(Flags.fromKQueueEvent(kqueue_event.*));
- poll.onUpdate(loop);
+ poll.onUpdate(loop, kqueue_event.data);
}
pub fn onEpollEvent(poll: *FilePoll, loop: *uws.Loop, epoll_event: *std.os.linux.epoll_event) void {
poll.updateFlags(Flags.fromEpollEvent(epoll_event.*));
- poll.onUpdate(loop);
+ poll.onUpdate(loop, 0);
}
pub fn clearEvent(poll: *FilePoll, flag: Flags) void {
@@ -4072,16 +4072,16 @@ pub const FilePoll = struct {
return this.flags.contains(.poll_writable) or this.flags.contains(.poll_readable) or this.flags.contains(.poll_process);
}
- pub fn onUpdate(poll: *FilePoll, loop: *uws.Loop) void {
+ pub fn onUpdate(poll: *FilePoll, loop: *uws.Loop, size_or_offset: i64) void {
if (poll.flags.contains(.one_shot) and !poll.flags.contains(.needs_rearm)) {
if (poll.flags.contains(.has_incremented_poll_count)) poll.deactivate(loop);
poll.flags.insert(.needs_rearm);
}
var ptr = poll.owner;
switch (ptr.tag()) {
- @field(Owner.Tag, "FileBlobLoader") => {
- log("onUpdate: FileBlobLoader", .{});
- ptr.as(FileBlobLoader).onPoll(0, 0);
+ @field(Owner.Tag, "FileReader") => {
+ log("onUpdate: FileReader", .{});
+ ptr.as(FileReader).onPoll(size_or_offset, 0);
},
@field(Owner.Tag, "Subprocess") => {
log("onUpdate: Subprocess", .{});
@@ -4092,18 +4092,18 @@ pub const FilePoll = struct {
@field(Owner.Tag, "FileSink") => {
log("onUpdate: FileSink", .{});
var loader = ptr.as(JSC.WebCore.FileSink);
- loader.onPoll(0, 0);
+ loader.onPoll(size_or_offset, 0);
},
@field(Owner.Tag, "BufferedInput") => {
log("onUpdate: BufferedInput", .{});
var loader = ptr.as(JSC.Subprocess.BufferedInput);
- loader.onReady(0);
+ loader.onReady(size_or_offset);
},
@field(Owner.Tag, "BufferedOutput") => {
log("onUpdate: BufferedOutput", .{});
var loader = ptr.as(JSC.Subprocess.BufferedOutput);
- loader.ready(0);
+ loader.ready(size_or_offset);
},
else => {},
}
@@ -4196,34 +4196,45 @@ pub const FilePoll = struct {
const log = Output.scoped(.FilePoll, false);
pub inline fn isActive(this: *const FilePoll) bool {
- return this.flags.contains(.has_incremented_poll_count) and !this.flags.contains(.disable);
+ return this.flags.contains(.has_incremented_poll_count);
}
/// Make calling ref() on this poll into a no-op.
- pub fn disable(this: *FilePoll) void {
- if (this.isRegistered()) {
- this.unregister(JSC.VirtualMachine.vm.uws_event_loop.?);
- }
-
- this.unref();
+ pub fn disableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void {
+ if (this.flags.contains(.disable))
+ return;
this.flags.insert(.disable);
+
+ vm.uws_event_loop.?.active -= @as(u32, @boolToInt(this.flags.contains(.has_incremented_poll_count)));
+ }
+
+ pub fn enableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void {
+ if (!this.flags.contains(.disable))
+ return;
+ this.flags.remove(.disable);
+
+ vm.uws_event_loop.?.active += @as(u32, @boolToInt(this.flags.contains(.has_incremented_poll_count)));
+ }
+
+ pub fn canActivate(this: *const FilePoll) bool {
+ return !this.flags.contains(.has_incremented_poll_count);
}
/// Only intended to be used from EventLoop.Pollable
pub fn deactivate(this: *FilePoll, loop: *uws.Loop) void {
std.debug.assert(this.flags.contains(.has_incremented_poll_count));
+ loop.num_polls -= @as(i32, @boolToInt(this.flags.contains(.has_incremented_poll_count)));
+ loop.active -= @as(u32, @boolToInt(!this.flags.contains(.disable) and this.flags.contains(.has_incremented_poll_count)));
+
this.flags.remove(.has_incremented_poll_count);
- loop.num_polls -= 1;
- loop.active -= 1;
}
/// Only intended to be used from EventLoop.Pollable
pub fn activate(this: *FilePoll, loop: *uws.Loop) void {
- std.debug.assert(!this.flags.contains(.has_incremented_poll_count));
- std.debug.assert(!this.flags.contains(.disable));
+ loop.num_polls += @as(i32, @boolToInt(!this.flags.contains(.has_incremented_poll_count)));
+ loop.active += @as(u32, @boolToInt(!this.flags.contains(.disable) and !this.flags.contains(.has_incremented_poll_count)));
+
this.flags.insert(.has_incremented_poll_count);
- loop.num_polls += 1;
- loop.active += 1;
}
pub fn init(vm: *JSC.VirtualMachine, fd: JSC.Node.FileDescriptor, flags: Flags.Struct, comptime Type: type, owner: *Type) *FilePoll {
@@ -4240,9 +4251,20 @@ pub const FilePoll = struct {
return poll;
}
+ pub inline fn canRef(this: *const FilePoll) bool {
+ if (this.flags.contains(.disable))
+ return false;
+
+ return !this.flags.contains(.has_incremented_poll_count);
+ }
+
+ pub inline fn canUnref(this: *const FilePoll) bool {
+ return this.flags.contains(.has_incremented_poll_count);
+ }
+
/// Prevent a poll from keeping the process alive.
pub fn unref(this: *FilePoll, vm: *JSC.VirtualMachine) void {
- if (!this.isActive())
+ if (!this.canUnref())
return;
log("unref", .{});
this.deactivate(vm.uws_event_loop.?);
@@ -4250,7 +4272,7 @@ pub const FilePoll = struct {
/// Allow a poll to keep the process alive.
pub fn ref(this: *FilePoll, vm: *JSC.VirtualMachine) void {
- if (this.isActive())
+ if (this.canRef())
return;
log("ref", .{});
this.activate(vm.uws_event_loop.?);
@@ -4296,11 +4318,13 @@ pub const FilePoll = struct {
std.debug.assert(this.fd != invalid_fd);
if (comptime Environment.isLinux) {
+ const one_shot_flag: u32 = if (!this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT;
+
const flags: u32 = switch (flag) {
.process,
.readable,
- => linux.EPOLL.IN | linux.EPOLL.HUP | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT),
- .writable => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | (if (this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT),
+ => linux.EPOLL.IN | linux.EPOLL.HUP | one_shot_flag,
+ .writable => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | one_shot_flag,
else => unreachable,
};
@@ -4318,6 +4342,7 @@ pub const FilePoll = struct {
}
} else if (comptime Environment.isMac) {
var changelist = std.mem.zeroes([2]std.os.system.kevent64_s);
+ const one_shot_flag: @TypeOf(changelist[0].flags) = if (!this.flags.contains(.one_shot)) 0 else std.c.EV_ONESHOT;
changelist[0] = switch (flag) {
.readable => .{
.ident = @intCast(u64, fd),
@@ -4325,7 +4350,7 @@ pub const FilePoll = struct {
.data = 0,
.fflags = 0,
.udata = @ptrToInt(Pollable.init(this).ptr()),
- .flags = std.c.EV_ADD | std.c.EV_ONESHOT,
+ .flags = std.c.EV_ADD | one_shot_flag,
.ext = .{ 0, 0 },
},
.writable => .{
@@ -4334,7 +4359,7 @@ pub const FilePoll = struct {
.data = 0,
.fflags = 0,
.udata = @ptrToInt(Pollable.init(this).ptr()),
- .flags = std.c.EV_ADD | std.c.EV_ONESHOT,
+ .flags = std.c.EV_ADD | one_shot_flag,
.ext = .{ 0, 0 },
},
.process => .{
@@ -4343,9 +4368,10 @@ pub const FilePoll = struct {
.data = 0,
.fflags = std.c.NOTE_EXIT,
.udata = @ptrToInt(Pollable.init(this).ptr()),
- .flags = std.c.EV_ADD,
+ .flags = std.c.EV_ADD | one_shot_flag,
.ext = .{ 0, 0 },
},
+ else => unreachable,
};
// output events only include change errors
@@ -4393,17 +4419,20 @@ pub const FilePoll = struct {
} else {
@compileError("TODO: Pollable");
}
-
- if (!this.isActive()) this.activate(loop);
+ if (this.canActivate())
+ this.activate(loop);
this.flags.insert(switch (flag) {
- .process, .readable => .poll_readable,
+ .readable => .poll_readable,
+ .process => if (comptime Environment.isLinux) .poll_readable else .poll_process,
.writable => .poll_writable,
else => unreachable,
});
+ this.flags.remove(.needs_rearm);
+
return JSC.Maybe(void).success;
}
- pub const invalid_fd = std.math.maxInt(u32);
+ pub const invalid_fd = JSC.Node.invalid_fd;
pub fn unregister(this: *FilePoll, loop: *uws.Loop) JSC.Maybe(void) {
if (!(this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process))) {
@@ -4424,6 +4453,14 @@ pub const FilePoll = struct {
return JSC.Maybe(void).success;
};
+ if (this.flags.contains(.needs_rearm)) {
+ log("unregister: {s} ({d}) skipped due to needs_rearm", .{ @tagName(flag), fd });
+ this.flags.remove(.poll_process);
+ this.flags.remove(.poll_readable);
+ this.flags.remove(.poll_process);
+ return JSC.Maybe(void).success;
+ }
+
log("unregister: {s} ({d})", .{ @tagName(flag), fd });
if (comptime Environment.isLinux) {
@@ -4441,22 +4478,22 @@ pub const FilePoll = struct {
var changelist = std.mem.zeroes([2]std.os.system.kevent64_s);
changelist[0] = switch (flag) {
- .read => .{
+ .readable => .{
.ident = @intCast(u64, fd),
.filter = std.os.system.EVFILT_READ,
.data = 0,
.fflags = 0,
.udata = @ptrToInt(Pollable.init(this).ptr()),
- .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .flags = std.c.EV_DELETE,
.ext = .{ 0, 0 },
},
- .write => .{
+ .writable => .{
.ident = @intCast(u64, fd),
.filter = std.os.system.EVFILT_WRITE,
.data = 0,
.fflags = 0,
.udata = @ptrToInt(Pollable.init(this).ptr()),
- .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .flags = std.c.EV_DELETE,
.ext = .{ 0, 0 },
},
.process => .{
@@ -4465,7 +4502,7 @@ pub const FilePoll = struct {
.data = 0,
.fflags = std.c.NOTE_EXIT,
.udata = @ptrToInt(Pollable.init(this).ptr()),
- .flags = std.c.EV_DELETE | std.c.EV_ONESHOT,
+ .flags = std.c.EV_DELETE,
.ext = .{ 0, 0 },
},
else => unreachable,
@@ -4498,10 +4535,9 @@ pub const FilePoll = struct {
}
const errno = std.c.getErrno(rc);
-
switch (rc) {
std.math.minInt(@TypeOf(rc))...-1 => return JSC.Maybe(void).errnoSys(@enumToInt(errno), .kevent).?,
- else => unreachable,
+ else => {},
}
} else {
@compileError("TODO: Pollable");