aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred SUmner <jarred@jarredsumner.com> 2022-01-24 23:30:23 -0800
committerGravatar Jarred SUmner <jarred@jarredsumner.com> 2022-01-24 23:30:23 -0800
commit0e138bcc8f82715fcd84289ee7410570ceedd2a2 (patch)
tree47cb6e4abedf3b18a0e6391dbdb8933f5da6072f
parentecea12d2061f3508b9e5686d605ab7e6e6ac932e (diff)
downloadbun-0e138bcc8f82715fcd84289ee7410570ceedd2a2.tar.gz
bun-0e138bcc8f82715fcd84289ee7410570ceedd2a2.tar.zst
bun-0e138bcc8f82715fcd84289ee7410570ceedd2a2.zip
Fallback to readev / writev
-rw-r--r--src/io/io_linux.zig233
1 files changed, 226 insertions, 7 deletions
diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig
index 1bf90eec6..c950ee8ea 100644
--- a/src/io/io_linux.zig
+++ b/src/io/io_linux.zig
@@ -1,5 +1,6 @@
const std = @import("std");
const assert = std.debug.assert;
+const Platform = @import("analytics").GenerateHeader.GeneratePlatform;
const os = struct {
pub usingnamespace std.os;
pub const EPERM = 1;
@@ -141,6 +142,16 @@ const os = struct {
pub const EHWPOISON = 133;
};
+pub const pretend_older_kernel = false;
+const Features = struct {
+ connect_poll: bool = pretend_older_kernel,
+ close_poll: bool = pretend_older_kernel,
+ replace_recv_with_readv: bool = pretend_older_kernel,
+ replace_send_with_writev: bool = pretend_older_kernel,
+};
+
+var features = Features{};
+
pub const Errno = error{
EPERM,
ENOENT,
@@ -444,6 +455,26 @@ completed: FIFO(Completion) = .{},
pub fn init(entries_: u12, flags: u32) !IO {
var ring: IO_Uring = undefined;
var entries = entries_;
+
+ const kernel = Platform.kernelVersion();
+ if (kernel.orderWithoutTag(@TypeOf(kernel){ .major = 5, .minor = 6, .patch = 0 }) == .lt) {
+ features.close_poll = true;
+ features.connect_poll = true;
+ features.replace_recv_with_readv = true;
+ features.replace_send_with_writev = true;
+ }
+
+ var limit = linux.rlimit{ .cur = 0, .max = 0 };
+ if (linux.getrlimit(.MEMLOCK, &limit) == 0) {
+ if (limit.cur < 16 * 1024) {
+ return error.@"memlock is too low. Please increase it to at least 64k";
+ }
+
+ if (limit.cur < 128 * 1024) {
+ entries = @minimum(256, entries);
+ }
+ }
+
while (true) {
ring = IO_Uring.init(entries, flags) catch |err| {
if (err == error.SystemResources) {
@@ -640,6 +671,12 @@ pub const Completion = struct {
op.address.getOsSockLen(),
);
},
+ .close_poll => |op| {
+ linux.io_uring_prep_poll_add(sqe, op.fd, linux.POLL.HUP | linux.POLL.IN | linux.POLL.OUT);
+ },
+ .connect_poll => |*op| {
+ linux.io_uring_prep_poll_add(sqe, op.socket, linux.POLL.HUP | linux.POLL.OUT);
+ },
.fsync => |op| {
linux.io_uring_prep_fsync(sqe, op.fd, 0);
},
@@ -651,6 +688,14 @@ pub const Completion = struct {
op.offset,
);
},
+ .readev => {
+ var op = &completion.operation.readev;
+ linux.io_uring_prep_readv(sqe, op.socket, &op.iovecs, 0);
+ },
+ .writev => {
+ var op = &completion.operation.writev;
+ linux.io_uring_prep_writev(sqe, op.socket, &op.iovecs, 0);
+ },
.recv => |op| {
linux.io_uring_prep_recv(sqe, op.socket, op.buffer, os.MSG.NOSIGNAL);
},
@@ -696,6 +741,21 @@ pub const Completion = struct {
} else @intCast(os.socket_t, completion.result);
completion.callback(completion.context, completion, &result);
},
+ .close_poll => {
+ var op = &completion.operation.close_poll;
+ const rc = linux.close(op.fd);
+ completion.result = @intCast(i32, rc);
+
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425
+ os.EBADF => error.FileDescriptorInvalid,
+ os.EDQUOT => error.DiskQuota,
+ os.EIO => error.InputOutput,
+ os.ENOSPC => error.NoSpaceLeft,
+ else => |errno| asError(errno),
+ } else assert(completion.result == 0);
+ completion.callback(completion.context, completion, &result);
+ },
.close => {
const result = if (completion.result < 0) switch (-completion.result) {
os.EINTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425
@@ -707,6 +767,35 @@ pub const Completion = struct {
} else assert(completion.result == 0);
completion.callback(completion.context, completion, &result);
},
+ .connect_poll => {
+ var op = &completion.operation.connect_poll;
+ const rc = linux.connect(op.socket, &op.address.any, op.address.getOsSockLen());
+ completion.result = @intCast(i32, rc);
+
+ const result = if (completion.result < 0) switch (-completion.result) {
+ os.EAGAIN, os.EWOULDBLOCK, os.EINPROGRESS, os.EINTR => {
+ completion.io.enqueue(completion);
+ return;
+ },
+ os.EACCES => error.AccessDenied,
+ os.EADDRINUSE => error.AddressInUse,
+ os.EADDRNOTAVAIL => error.AddressNotAvailable,
+ os.EAFNOSUPPORT => error.AddressFamilyNotSupported,
+ os.EALREADY => error.OpenAlreadyInProgress,
+ os.EBADF => error.FileDescriptorInvalid,
+ os.ECONNREFUSED => error.ConnectionRefused,
+ os.ECONNRESET => error.ConnectionResetByPeer,
+ os.EISCONN => error.AlreadyConnected,
+ os.ENETUNREACH => error.NetworkUnreachable,
+ os.ENOENT => error.FileNotFound,
+ os.ENOTSOCK => error.FileDescriptorNotASocket,
+ os.EPERM => error.PermissionDenied,
+ os.EPROTOTYPE => error.ProtocolNotSupported,
+ os.ETIMEDOUT => error.ConnectionTimedOut,
+ else => |errno| asError(errno),
+ } else assert(completion.result == 0);
+ completion.callback(completion.context, completion, &result);
+ },
.connect => {
const result = if (completion.result < 0) switch (-completion.result) {
os.EINTR => {
@@ -770,7 +859,7 @@ pub const Completion = struct {
} else @intCast(usize, completion.result);
completion.callback(completion.context, completion, &result);
},
- .recv => {
+ .readev, .recv => {
const result = if (completion.result < 0) switch (-completion.result) {
os.EINTR => {
completion.io.enqueue(completion);
@@ -787,7 +876,7 @@ pub const Completion = struct {
} else @intCast(usize, completion.result);
completion.callback(completion.context, completion, &result);
},
- .send => {
+ .writev, .send => {
const result = if (completion.result < 0) switch (-completion.result) {
os.EINTR => {
completion.io.enqueue(completion);
@@ -859,10 +948,17 @@ const Operation = union(enum) {
close: struct {
fd: os.fd_t,
},
+ close_poll: struct {
+ fd: os.fd_t,
+ },
connect: struct {
socket: os.socket_t,
address: std.net.Address,
},
+ connect_poll: struct {
+ socket: os.socket_t,
+ address: std.net.Address,
+ },
fsync: struct {
fd: os.fd_t,
},
@@ -871,6 +967,14 @@ const Operation = union(enum) {
buffer: []u8,
offset: u64,
},
+ readev: struct {
+ socket: os.socket_t,
+ iovecs: [1]os.iovec,
+ },
+ writev: struct {
+ socket: os.socket_t,
+ iovecs: [1]os.iovec_const,
+ },
recv: struct {
socket: os.socket_t,
buffer: []u8,
@@ -969,10 +1073,25 @@ pub fn close(
);
}
}.wrapper,
- .operation = .{
+ .operation = if (features.close_poll) .{
+ .close_poll = .{ .fd = fd },
+ } else .{
.close = .{ .fd = fd },
},
};
+
+ if (features.close_poll) {
+ const rc = linux.close(fd);
+ switch (linux.getErrno(rc)) {
+ .AGAIN, .INPROGRESS, .INTR => {},
+ else => {
+ completion.result = @intCast(i32, rc);
+ self.completed.push(completion);
+ return;
+ },
+ }
+ }
+
self.enqueue(completion);
}
@@ -1019,13 +1138,30 @@ pub fn connect(
);
}
}.wrapper,
- .operation = .{
+ .operation = if (features.connect_poll) .{
+ .connect_poll = .{
+ .socket = socket,
+ .address = address,
+ },
+ } else .{
.connect = .{
.socket = socket,
.address = address,
},
},
};
+
+ if (features.connect_poll) {
+ const rc = linux.connect(socket, &address.any, address.getOsSockLen());
+ switch (linux.getErrno(rc)) {
+ .AGAIN, .INPROGRESS, .INTR => {},
+ else => {
+ completion.result = @intCast(i32, rc);
+ self.completed.push(completion);
+ return;
+ },
+ }
+ }
self.enqueue(completion);
}
@@ -1141,6 +1277,11 @@ pub fn recv(
socket: os.socket_t,
buffer: []u8,
) void {
+ if (features.replace_recv_with_readv) {
+ readev(self, Context, context, callback, completion, socket, buffer);
+ return;
+ }
+
completion.* = .{
.io = self,
.context = context,
@@ -1163,6 +1304,41 @@ pub fn recv(
self.enqueue(completion);
}
+pub fn readev(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: RecvError!usize,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ buffer: []u8,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const RecvError!usize, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .readev = .{
+ .socket = socket,
+ .iovecs = .{.{ .iov_base = buffer.ptr, .iov_len = buffer.len }},
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
pub const SendError = error{
AccessDenied,
WouldBlock,
@@ -1192,6 +1368,11 @@ pub fn send(
buffer: []const u8,
_: u32,
) void {
+ if (features.replace_send_with_writev) {
+ writev(self, Context, context, callback, completion, socket, buffer, 0);
+ return;
+ }
+
completion.* = .{
.io = self,
.context = context,
@@ -1214,6 +1395,44 @@ pub fn send(
self.enqueue(completion);
}
+pub fn writev(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: SendError!usize,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ buffer: []const u8,
+ _: u32,
+) void {
+ completion.* = .{
+ .io = self,
+ .context = context,
+ .callback = struct {
+ fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
+ callback(
+ @intToPtr(Context, @ptrToInt(ctx)),
+ comp,
+ @intToPtr(*const SendError!usize, @ptrToInt(res)).*,
+ );
+ }
+ }.wrapper,
+ .operation = .{
+ .writev = .{
+ .socket = socket,
+ .iovecs = .{
+ .{ .iov_base = buffer.ptr, .iov_len = buffer.len },
+ },
+ },
+ },
+ };
+ self.enqueue(completion);
+}
+
pub const TimeoutError = error{Canceled} || Errno;
pub fn timeout(
@@ -1314,7 +1533,7 @@ const SocketError = error{
const Syscall = struct {
pub fn socket(domain: u32, socket_type: u32, protocol: u32) SocketError!os.socket_t {
const rc = linux.socket(domain, socket_type, protocol);
- return switch (linux.getErrno(rc)) {
+ return switch (linux.getErrno((rc))) {
.SUCCESS => @intCast(os.fd_t, rc),
.ACCES => return error.PermissionDenied,
.AFNOSUPPORT => return error.AddressFamilyNotSupported,
@@ -1325,13 +1544,13 @@ const Syscall = struct {
.NOMEM => return error.SystemResources,
.PROTONOSUPPORT => return error.ProtocolNotSupported,
.PROTOTYPE => return error.SocketTypeNotSupported,
- else => |err| return asError(err),
+ else => |err| return asError(@enumToInt(err)),
};
}
};
pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t {
- return Syscall.socket(family, sock_type, protocol);
+ return Syscall.socket(family, sock_type | os.O.NONBLOCK | os.O.CLOEXEC, protocol);
}
pub var global: IO = undefined;