aboutsummaryrefslogtreecommitdiff
path: root/src/io/io_windows.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/io/io_windows.zig')
-rw-r--r--src/io/io_windows.zig1331
1 files changed, 1331 insertions, 0 deletions
diff --git a/src/io/io_windows.zig b/src/io/io_windows.zig
new file mode 100644
index 000000000..55803d7ea
--- /dev/null
+++ b/src/io/io_windows.zig
@@ -0,0 +1,1331 @@
+/// Thanks to Tigerbeetle - https://github.com/tigerbeetle/tigerbeetle/blob/532c8b70b9142c17e07737ab6d3da68d7500cbca/src/io/windows.zig#L1
+/// Apache2 license - https://github.com/tigerbeetle/tigerbeetle/blob/532c8b70b9142c17e07737ab6d3da68d7500cbca/LICENSE
+const std = @import("std");
+const os = std.os;
+const assert = std.debug.assert;
+const log = std.log.scoped(.io);
+const bun = @import("root").bun;
+const FIFO = @import("./fifo.zig").FIFO;
+const windows = bun.windows;
+
+const Time = struct {
+ const Self = @This();
+
+ /// Hardware and/or software bugs can mean that the monotonic clock may regress.
+ /// One example (of many): https://bugzilla.redhat.com/show_bug.cgi?id=448449
+ /// We crash the process for safety if this ever happens, to protect against infinite loops.
+ /// It's better to crash and come back with a valid monotonic clock than get stuck forever.
+ monotonic_guard: u64 = 0,
+
+ /// A timestamp to measure elapsed time, meaningful only on the same system, not across reboots.
+ /// Always use a monotonic timestamp if the goal is to measure elapsed time.
+ /// This clock is not affected by discontinuous jumps in the system time, for example if the
+ /// system administrator manually changes the clock.
+ pub fn monotonic(self: *Self) u64 {
+ const m = blk: {
+ // Uses QueryPerformanceCounter() on windows due to it being the highest precision timer
+ // available while also accounting for time spent suspended by default:
+ // https://docs.microsoft.com/en-us/windows/win32/api/realtimeapiset/nf-realtimeapiset-queryunbiasedinterrupttime#remarks
+
+ // QPF need not be globally cached either as it ends up being a load from read-only
+ // memory mapped to all processed by the kernel called KUSER_SHARED_DATA (See "QpcFrequency")
+ // https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/ntddk/ns-ntddk-kuser_shared_data
+ // https://www.geoffchappell.com/studies/windows/km/ntoskrnl/inc/api/ntexapi_x/kuser_shared_data/index.htm
+ const qpc = os.windows.QueryPerformanceCounter();
+ const qpf = os.windows.QueryPerformanceFrequency();
+
+ // 10Mhz (1 qpc tick every 100ns) is a common QPF on modern systems.
+ // We can optimize towards this by converting to ns via a single multiply.
+ // https://github.com/microsoft/STL/blob/785143a0c73f030238ef618890fd4d6ae2b3a3a0/stl/inc/chrono#L694-L701
+ const common_qpf = 10_000_000;
+ if (qpf == common_qpf) break :blk qpc * (std.time.ns_per_s / common_qpf);
+
+ // Convert qpc to nanos using fixed point to avoid expensive extra divs and overflow.
+ const scale = (std.time.ns_per_s << 32) / qpf;
+ break :blk @as(u64, @truncate((@as(u96, qpc) * scale) >> 32));
+ };
+
+ // "Oops!...I Did It Again"
+ if (m < self.monotonic_guard) @panic("a hardware/kernel bug regressed the monotonic clock");
+ self.monotonic_guard = m;
+ return m;
+ }
+
+ /// A timestamp to measure real (i.e. wall clock) time, meaningful across systems, and reboots.
+ /// This clock is affected by discontinuous jumps in the system time.
+ pub fn realtime(_: *Self) i64 {
+ const kernel32 = struct {
+ extern "kernel32" fn GetSystemTimePreciseAsFileTime(
+ lpFileTime: *os.windows.FILETIME,
+ ) callconv(os.windows.WINAPI) void;
+ };
+
+ var ft: os.windows.FILETIME = undefined;
+ kernel32.GetSystemTimePreciseAsFileTime(&ft);
+ const ft64 = (@as(u64, ft.dwHighDateTime) << 32) | ft.dwLowDateTime;
+
+ // FileTime is in units of 100 nanoseconds
+ // and uses the NTFS/Windows epoch of 1601-01-01 instead of Unix Epoch 1970-01-01.
+ const epoch_adjust = std.time.epoch.windows * (std.time.ns_per_s / 100);
+ return (@as(i64, @bitCast(ft64)) + epoch_adjust) * 100;
+ }
+
+ pub fn tick(_: *Self) void {}
+};
+pub const system = os.windows;
+
+pub const CloseError = error{
+ FileDescriptorInvalid,
+ DiskQuota,
+ InputOutput,
+ NoSpaceLeft,
+} || os.UnexpectedError;
+pub const WriteError = os.PWriteError;
+pub const ConnectError = os.ConnectError || error{FileDescriptorNotASocket};
+
+pub const Errno = bun.C.SystemErrno.Error;
+
+pub fn asError(err: anytype) Errno {
+ if (bun.C.SystemErrno.init(err)) |e| {
+ return e.toError();
+ } else {
+ return error.Unexpected;
+ }
+}
+
+pub const Waker = struct {
+ iocp: os.windows.HANDLE,
+
+ pub const completion_key = std.math.maxInt(isize) - 24;
+
+ const kernel32 = os.windows.kernel32;
+ pub fn init(_: std.mem.Allocator) !Waker {
+ _ = try os.windows.WSAStartup(2, 2);
+ errdefer os.windows.WSACleanup() catch unreachable;
+
+ const iocp = try os.windows.CreateIoCompletionPort(os.windows.INVALID_HANDLE_VALUE, null, completion_key, 0);
+ return Waker{
+ .iocp = iocp,
+ };
+ }
+
+ pub fn initWithFileDescriptor(_: std.mem.Allocator, fd: bun.FileDescriptor) Waker {
+ return Waker{
+ .iocp = bun.fdcast(fd),
+ };
+ }
+
+ pub fn wait(this: Waker) !u64 {
+ var overlapped = [_]os.windows.OVERLAPPED_ENTRY{std.mem.zeroes(os.windows.OVERLAPPED_ENTRY)} ** 1;
+ var removed: u32 = 0;
+ _ = kernel32.GetQueuedCompletionStatusEx(this.iocp, &overlapped, 1, &removed, 0, 1);
+ return 0;
+ }
+
+ pub fn wake(this: Waker) !void {
+ var overlapped: os.windows.OVERLAPPED = std.mem.zeroes(os.windows.OVERLAPPED);
+ _ = kernel32.PostQueuedCompletionStatus(this.iocp, 1, completion_key, &overlapped);
+ }
+};
+
+pub fn wait(this: *IO, context: anytype, comptime function: anytype) void {
+ this.flush(.blocking) catch unreachable;
+ function(context);
+}
+
+/// This struct holds the data needed for a single IO operation
+pub const Completion = struct {
+ next: ?*Completion,
+ context: ?*anyopaque,
+ callback: *const fn (Context) void,
+ operation: Operation,
+
+ const Context = struct {
+ io: *IO,
+ completion: *Completion,
+ };
+
+ const Overlapped = struct {
+ raw: os.windows.OVERLAPPED,
+ completion: *Completion,
+ };
+
+ const Transfer = struct {
+ socket: os.socket_t,
+ buf: os.windows.ws2_32.WSABUF,
+ overlapped: Overlapped,
+ pending: bool,
+ };
+
+ const Operation = union(enum) {
+ accept: struct {
+ overlapped: Overlapped,
+ listen_socket: os.socket_t,
+ client_socket: os.socket_t,
+ addr_buffer: [(@sizeOf(std.net.Address) + 16) * 2]u8 align(4),
+ },
+ open: struct {
+ path: [:0]const u8,
+ flags: bun.JSC.Node.Mode,
+ },
+ connect: struct {
+ socket: os.socket_t,
+ address: std.net.Address,
+ overlapped: Overlapped,
+ pending: bool,
+ },
+ send: Transfer,
+ recv: Transfer,
+ read: struct {
+ fd: bun.FileDescriptor,
+ buf: [*]u8,
+ len: u32,
+ offset: ?u64,
+ },
+ write: struct {
+ fd: bun.FileDescriptor,
+ buf: [*]const u8,
+ len: u32,
+ offset: ?u64,
+ },
+ close: struct {
+ fd: bun.FileDescriptor,
+ },
+ timeout: struct {
+ deadline: u64,
+ },
+ };
+};
+
+fn buffer_limit(buffer_len: usize) usize {
+
+ // Linux limits how much may be written in a `pwrite()/pread()` call, which is `0x7ffff000` on
+ // both 64-bit and 32-bit systems, due to using a signed C int as the return value, as well as
+ // stuffing the errno codes into the last `4096` values.
+ // Darwin limits writes to `0x7fffffff` bytes, more than that returns `EINVAL`.
+ // The corresponding POSIX limit is `std.math.maxInt(isize)`.
+ const limit = switch (@import("builtin").target.os.tag) {
+ .linux => 0x7ffff000,
+ .macos, .ios, .watchos, .tvos => std.math.maxInt(i32),
+ else => std.math.maxInt(isize),
+ };
+ return @min(limit, buffer_len);
+}
+
+iocp: os.windows.HANDLE,
+timer: Time = .{},
+io_pending: usize = 0,
+timeouts: FIFO(Completion) = .{},
+completed: FIFO(Completion) = .{},
+
+pub const IO = @This();
+
+pub fn init(_: u12, _: u32, waker: Waker) !IO {
+ return IO{ .iocp = waker.iocp };
+}
+
+pub fn deinit(self: *IO) void {
+ assert(self.iocp != os.windows.INVALID_HANDLE_VALUE);
+ os.windows.CloseHandle(self.iocp);
+ self.iocp = os.windows.INVALID_HANDLE_VALUE;
+
+ os.windows.WSACleanup() catch unreachable;
+}
+
+pub fn tick(self: *IO) !void {
+ return self.flush(.non_blocking);
+}
+
+pub fn run_for_ns(self: *IO, nanoseconds: u63) !void {
+ const Callback = struct {
+ fn on_timeout(timed_out: *bool, completion: *Completion, result: TimeoutError!void) void {
+ _ = result catch unreachable;
+ _ = completion;
+ timed_out.* = true;
+ }
+ };
+
+ var timed_out = false;
+ var completion: Completion = undefined;
+ self.timeout(*bool, &timed_out, Callback.on_timeout, &completion, nanoseconds);
+
+ while (!timed_out) {
+ try self.flush(.blocking);
+ }
+}
+
+const FlushMode = enum {
+ blocking,
+ non_blocking,
+};
+
+fn flush(self: *IO, comptime mode: FlushMode) anyerror!void {
+ if (self.completed.peek() == null) {
+ // Compute how long to poll by flushing timeout completions.
+ // NOTE: this may push to completed queue
+ var timeout_ms: ?os.windows.DWORD = null;
+ if (self.flush_timeouts()) |expires_ns| {
+ // 0ns expires should have been completed not returned
+ assert(expires_ns != 0);
+ // Round up sub-millisecond expire times to the next millisecond
+ const expires_ms = (expires_ns + (std.time.ns_per_ms / 2)) / std.time.ns_per_ms;
+ // Saturating cast to DWORD milliseconds
+ const expires = std.math.cast(os.windows.DWORD, expires_ms) orelse std.math.maxInt(os.windows.DWORD);
+ // max DWORD is reserved for INFINITE so cap the cast at max - 1
+ timeout_ms = if (expires == os.windows.INFINITE) expires - 1 else expires;
+ }
+
+ // Poll for IO iff theres IO pending and flush_timeouts() found no ready completions
+ if (self.io_pending > 0 and self.completed.peek() == null) {
+ // In blocking mode, we're always waiting at least until the timeout by run_for_ns.
+ // In non-blocking mode, we shouldn't wait at all.
+ const io_timeout = switch (mode) {
+ .blocking => timeout_ms orelse 0,
+ .non_blocking => 0,
+ };
+
+ var events: [64]os.windows.OVERLAPPED_ENTRY = undefined;
+ const num_events = try os.windows.GetQueuedCompletionStatusEx(
+ self.iocp,
+ &events,
+ io_timeout,
+ false, // non-alertable wait
+ );
+
+ assert(self.io_pending >= num_events);
+ self.io_pending -= num_events;
+
+ for (events[0..num_events]) |event| {
+ const raw_overlapped = event.lpOverlapped;
+ const overlapped = @fieldParentPtr(Completion.Overlapped, "raw", raw_overlapped);
+ const completion = overlapped.completion;
+ completion.next = null;
+ self.completed.push(completion);
+ }
+ }
+ }
+
+ // Dequeue and invoke all the completions currently ready.
+ // Must read all `completions` before invoking the callbacks
+ // as the callbacks could potentially submit more completions.
+ var completed = self.completed;
+ self.completed = .{};
+ while (completed.pop()) |completion| {
+ (completion.callback)(Completion.Context{
+ .io = self,
+ .completion = completion,
+ });
+ }
+}
+
+fn flush_timeouts(self: *IO) ?u64 {
+ var min_expires: ?u64 = null;
+ var current_time: ?u64 = null;
+ var timeouts: ?*Completion = self.timeouts.peek();
+
+ // iterate through the timeouts, returning min_expires at the end
+ while (timeouts) |completion| {
+ timeouts = completion.next;
+
+ // lazily get the current time
+ const now = current_time orelse self.timer.monotonic();
+ current_time = now;
+
+ // move the completion to completed if it expired
+ if (now >= completion.operation.timeout.deadline) {
+ self.timeouts.remove(completion);
+ self.completed.push(completion);
+ continue;
+ }
+
+ // if it's still waiting, update min_timeout
+ const expires = completion.operation.timeout.deadline - now;
+ if (min_expires) |current_min_expires| {
+ min_expires = @min(expires, current_min_expires);
+ } else {
+ min_expires = expires;
+ }
+ }
+
+ return min_expires;
+}
+
+fn submit(
+ self: *IO,
+ context: anytype,
+ comptime callback: anytype,
+ completion: *Completion,
+ comptime op_tag: std.meta.Tag(Completion.Operation),
+ op_data: anytype,
+ comptime OperationImpl: type,
+) void {
+ const Context = @TypeOf(context);
+ const Callback = struct {
+ fn onComplete(ctx: Completion.Context) void {
+ // Perform the operation and get the result
+ const data = &@field(ctx.completion.operation, @tagName(op_tag));
+ const result = OperationImpl.do_operation(ctx, data);
+
+ // For OVERLAPPED IO, error.WouldBlock assumes that it will be completed by IOCP.
+ switch (op_tag) {
+ .accept, .read, .recv, .connect, .write, .send => {
+ _ = result catch |err| switch (err) {
+ error.WouldBlock => {
+ ctx.io.io_pending += 1;
+ return;
+ },
+ else => {},
+ };
+ },
+ else => {},
+ }
+
+ // The completion is finally ready to invoke the callback
+ callback(
+ @as(Context, @ptrFromInt(@intFromPtr(ctx.completion.context))),
+ ctx.completion,
+ result,
+ );
+ }
+ };
+
+ // Setup the completion with the callback wrapper above
+ completion.* = .{
+ .next = null,
+ .context = @as(?*anyopaque, @ptrCast(context)),
+ .callback = Callback.onComplete,
+ .operation = @unionInit(Completion.Operation, @tagName(op_tag), op_data),
+ };
+
+ // Submit the completion onto the right queue
+ switch (op_tag) {
+ .timeout => self.timeouts.push(completion),
+ else => self.completed.push(completion),
+ }
+}
+
+pub const OpenError = bun.C.SystemErrno.Error;
+
+pub fn open(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: *const fn (
+ context: Context,
+ completion: *Completion,
+ result: OpenError!bun.FileDescriptor,
+ ) void,
+ completion: *Completion,
+ path: [:0]const u8,
+ flags: bun.JSC.Node.Mode,
+ _: bun.JSC.Node.Mode,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .open,
+ .{
+ .path = path,
+ .flags = flags,
+ },
+ struct {
+ fn do_operation(ctx: Completion.Context, op: anytype) OpenError!bun.FileDescriptor {
+ _ = ctx;
+ const result = bun.sys.openat(bun.invalid_fd, op.path, op.flags, 0);
+ try result.throw();
+ return result.result;
+ }
+ },
+ );
+}
+
+pub fn accept(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: AcceptError!os.socket_t,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .accept,
+ .{
+ .overlapped = undefined,
+ .listen_socket = socket,
+ .client_socket = INVALID_SOCKET,
+ .addr_buffer = undefined,
+ },
+ struct {
+ fn do_operation(ctx: Completion.Context, op: anytype) AcceptError!os.socket_t {
+ var flags: os.windows.DWORD = undefined;
+ var transferred: os.windows.DWORD = undefined;
+
+ const rc = switch (op.client_socket) {
+ // When first called, the client_socket is invalid so we start the op.
+ INVALID_SOCKET => blk: {
+ // Create the socket that will be used for accept.
+ op.client_socket = ctx.io.open_socket(
+ os.AF.INET,
+ os.SOCK.STREAM,
+ os.IPPROTO.TCP,
+ ) catch |err| switch (err) {
+ error.AddressFamilyNotSupported, error.ProtocolNotSupported => unreachable,
+ else => |e| return e,
+ };
+
+ var sync_bytes_read: os.windows.DWORD = undefined;
+ op.overlapped = .{
+ .raw = std.mem.zeroes(os.windows.OVERLAPPED),
+ .completion = ctx.completion,
+ };
+
+ // Start the asynchronous accept with the created socket.
+ break :blk os.windows.ws2_32.AcceptEx(
+ op.listen_socket,
+ op.client_socket,
+ &op.addr_buffer,
+ 0,
+ @sizeOf(std.net.Address) + 16,
+ @sizeOf(std.net.Address) + 16,
+ &sync_bytes_read,
+ &op.overlapped.raw,
+ );
+ },
+ // Called after accept was started, so get the result
+ else => os.windows.ws2_32.WSAGetOverlappedResult(
+ op.listen_socket,
+ &op.overlapped.raw,
+ &transferred,
+ os.windows.FALSE, // dont wait
+ &flags,
+ ),
+ };
+
+ // return the socket if we succeed in accepting.
+ if (rc != os.windows.FALSE) {
+ // enables getsockopt, setsockopt, getsockname, getpeername
+ _ = os.windows.ws2_32.setsockopt(
+ op.client_socket,
+ os.windows.ws2_32.SOL.SOCKET,
+ os.windows.ws2_32.SO.UPDATE_ACCEPT_CONTEXT,
+ null,
+ 0,
+ );
+
+ return op.client_socket;
+ }
+
+ // destroy the client_socket we created if we get a non WouldBlock error
+ errdefer |result| {
+ _ = result catch |err| switch (err) {
+ error.WouldBlock => {},
+ else => {
+ os.closeSocket(op.client_socket);
+ op.client_socket = INVALID_SOCKET;
+ },
+ };
+ }
+
+ return switch (os.windows.ws2_32.WSAGetLastError()) {
+ .WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE => error.WouldBlock,
+ .WSANOTINITIALISED => unreachable, // WSAStartup() was called
+ .WSAENETDOWN => unreachable, // WinSock error
+ .WSAENOTSOCK => error.FileDescriptorNotASocket,
+ .WSAEOPNOTSUPP => error.OperationNotSupported,
+ .WSA_INVALID_HANDLE => unreachable, // we dont use hEvent in OVERLAPPED
+ .WSAEFAULT, .WSA_INVALID_PARAMETER => unreachable, // params should be ok
+ .WSAECONNRESET => error.ConnectionAborted,
+ .WSAEMFILE => unreachable, // we create our own descriptor so its available
+ .WSAENOBUFS => error.SystemResources,
+ .WSAEINTR, .WSAEINPROGRESS => unreachable, // no blocking calls
+ else => |err| os.windows.unexpectedWSAError(err),
+ };
+ }
+ },
+ );
+}
+
+pub fn connect(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: ConnectError!void,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ address: std.net.Address,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .connect,
+ .{
+ .socket = socket,
+ .address = address,
+ .overlapped = undefined,
+ .pending = false,
+ },
+ struct {
+ fn do_operation(ctx: Completion.Context, op: anytype) ConnectError!void {
+ var flags: os.windows.DWORD = undefined;
+ var transferred: os.windows.DWORD = undefined;
+
+ const rc = blk: {
+ // Poll for the result if we've already started the connect op.
+ if (op.pending) {
+ break :blk os.windows.ws2_32.WSAGetOverlappedResult(
+ op.socket,
+ &op.overlapped.raw,
+ &transferred,
+ os.windows.FALSE, // dont wait
+ &flags,
+ );
+ }
+
+ // ConnectEx requires the socket to be initially bound (INADDR_ANY)
+ const inaddr_any = std.mem.zeroes([4]u8);
+ const bind_addr = std.net.Address.initIp4(inaddr_any, 0);
+ os.bind(
+ op.socket,
+ &bind_addr.any,
+ bind_addr.getOsSockLen(),
+ ) catch |err| switch (err) {
+ error.AccessDenied => unreachable,
+ error.SymLinkLoop => unreachable,
+ error.NameTooLong => unreachable,
+ error.NotDir => unreachable,
+ error.ReadOnlyFileSystem => unreachable,
+ error.NetworkSubsystemFailed => unreachable,
+ error.AlreadyBound => unreachable,
+ else => |e| return e,
+ };
+
+ const LPFN_CONNECTEX = fn (
+ Socket: os.windows.ws2_32.SOCKET,
+ SockAddr: *const os.windows.ws2_32.sockaddr,
+ SockLen: os.socklen_t,
+ SendBuf: ?*const anyopaque,
+ SendBufLen: os.windows.DWORD,
+ BytesSent: *os.windows.DWORD,
+ Overlapped: *os.windows.OVERLAPPED,
+ ) callconv(os.windows.WINAPI) os.windows.BOOL;
+
+ // Find the ConnectEx function by dynamically looking it up on the socket.
+ const connect_ex = os.windows.loadWinsockExtensionFunction(
+ LPFN_CONNECTEX,
+ op.socket,
+ os.windows.ws2_32.WSAID_CONNECTEX,
+ ) catch |err| switch (err) {
+ error.OperationNotSupported => unreachable,
+ error.ShortRead => unreachable,
+ else => |e| return e,
+ };
+
+ op.pending = true;
+ op.overlapped = .{
+ .raw = std.mem.zeroes(os.windows.OVERLAPPED),
+ .completion = ctx.completion,
+ };
+
+ // Start the connect operation.
+ break :blk (connect_ex)(
+ op.socket,
+ &op.address.any,
+ op.address.getOsSockLen(),
+ null,
+ 0,
+ &transferred,
+ &op.overlapped.raw,
+ );
+ };
+
+ // return if we succeeded in connecting
+ if (rc != os.windows.FALSE) {
+ // enables getsockopt, setsockopt, getsockname, getpeername
+ _ = os.windows.ws2_32.setsockopt(
+ op.socket,
+ os.windows.ws2_32.SOL.SOCKET,
+ os.windows.ws2_32.SO.UPDATE_CONNECT_CONTEXT,
+ null,
+ 0,
+ );
+
+ return;
+ }
+
+ return switch (os.windows.ws2_32.WSAGetLastError()) {
+ .WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE, .WSAEALREADY => error.WouldBlock,
+ .WSANOTINITIALISED => unreachable, // WSAStartup() was called
+ .WSAENETDOWN => unreachable, // network subsystem is down
+ .WSAEADDRNOTAVAIL => error.AddressNotAvailable,
+ .WSAEAFNOSUPPORT => error.AddressFamilyNotSupported,
+ .WSAECONNREFUSED => error.ConnectionRefused,
+ .WSAEFAULT => unreachable, // all addresses should be valid
+ .WSAEINVAL => unreachable, // invalid socket type
+ .WSAEHOSTUNREACH, .WSAENETUNREACH => error.NetworkUnreachable,
+ .WSAENOBUFS => error.SystemResources,
+ .WSAENOTSOCK => unreachable, // socket is not bound or is listening
+ .WSAETIMEDOUT => error.ConnectionTimedOut,
+ .WSA_INVALID_HANDLE => unreachable, // we dont use hEvent in OVERLAPPED
+ else => |err| os.windows.unexpectedWSAError(err),
+ };
+ }
+ },
+ );
+}
+
+pub const SendError = os.SendError;
+
+pub fn send(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: SendError!usize,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ buffer: []const u8,
+) void {
+ const transfer = Completion.Transfer{
+ .socket = socket,
+ .buf = os.windows.ws2_32.WSABUF{
+ .len = @as(u32, @intCast(buffer_limit(buffer.len))),
+ .buf = @as([*]u8, @ptrFromInt(@intFromPtr(buffer.ptr))),
+ },
+ .overlapped = undefined,
+ .pending = false,
+ };
+
+ self.submit(
+ context,
+ callback,
+ completion,
+ .send,
+ transfer,
+ struct {
+ fn do_operation(ctx: Completion.Context, op: anytype) SendError!usize {
+ var flags: os.windows.DWORD = undefined;
+ var transferred: os.windows.DWORD = undefined;
+
+ const rc = blk: {
+ // Poll for the result if we've already started the send op.
+ if (op.pending) {
+ break :blk os.windows.ws2_32.WSAGetOverlappedResult(
+ op.socket,
+ &op.overlapped.raw,
+ &transferred,
+ os.windows.FALSE, // dont wait
+ &flags,
+ );
+ }
+
+ op.pending = true;
+ op.overlapped = .{
+ .raw = std.mem.zeroes(os.windows.OVERLAPPED),
+ .completion = ctx.completion,
+ };
+
+ // Start the send operation.
+ break :blk switch (os.windows.ws2_32.WSASend(
+ op.socket,
+ @as([*]os.windows.ws2_32.WSABUF, @ptrCast(&op.buf)),
+ 1, // one buffer
+ &transferred,
+ 0, // no flags
+ &op.overlapped.raw,
+ null,
+ )) {
+ os.windows.ws2_32.SOCKET_ERROR => @as(os.windows.BOOL, os.windows.FALSE),
+ 0 => os.windows.TRUE,
+ else => unreachable,
+ };
+ };
+
+ // Return bytes transferred on success.
+ if (rc != os.windows.FALSE)
+ return transferred;
+
+ return switch (os.windows.ws2_32.WSAGetLastError()) {
+ .WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE => error.WouldBlock,
+ .WSANOTINITIALISED => unreachable, // WSAStartup() was called
+ .WSA_INVALID_HANDLE => unreachable, // we dont use OVERLAPPED.hEvent
+ .WSA_INVALID_PARAMETER => unreachable, // parameters are fine
+ .WSAECONNABORTED => error.ConnectionResetByPeer,
+ .WSAECONNRESET => error.ConnectionResetByPeer,
+ .WSAEFAULT => unreachable, // invalid buffer
+ .WSAEINTR => unreachable, // this is non blocking
+ .WSAEINPROGRESS => unreachable, // this is non blocking
+ .WSAEINVAL => unreachable, // invalid socket type
+ .WSAEMSGSIZE => error.MessageTooBig,
+ .WSAENETDOWN => error.NetworkSubsystemFailed,
+ .WSAENETRESET => error.ConnectionResetByPeer,
+ .WSAENOBUFS => error.SystemResources,
+ .WSAENOTCONN => error.FileDescriptorNotASocket,
+ .WSAEOPNOTSUPP => unreachable, // we dont use MSG_OOB or MSG_PARTIAL
+ .WSAESHUTDOWN => error.BrokenPipe,
+ .WSA_OPERATION_ABORTED => unreachable, // operation was cancelled
+ else => |err| os.windows.unexpectedWSAError(err),
+ };
+ }
+ },
+ );
+}
+
+pub const RecvError = os.RecvFromError;
+
+pub fn recv(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: RecvError!usize,
+ ) void,
+ completion: *Completion,
+ socket: os.socket_t,
+ buffer: []u8,
+) void {
+ const transfer = Completion.Transfer{
+ .socket = socket,
+ .buf = os.windows.ws2_32.WSABUF{
+ .len = @as(u32, @intCast(buffer_limit(buffer.len))),
+ .buf = buffer.ptr,
+ },
+ .overlapped = undefined,
+ .pending = false,
+ };
+
+ self.submit(
+ context,
+ callback,
+ completion,
+ .recv,
+ transfer,
+ struct {
+ fn do_operation(ctx: Completion.Context, op: anytype) RecvError!usize {
+ var flags: os.windows.DWORD = 0; // used both as input and output
+ var transferred: os.windows.DWORD = undefined;
+
+ const rc = blk: {
+ // Poll for the result if we've already started the recv op.
+ if (op.pending) {
+ break :blk os.windows.ws2_32.WSAGetOverlappedResult(
+ op.socket,
+ &op.overlapped.raw,
+ &transferred,
+ os.windows.FALSE, // dont wait
+ &flags,
+ );
+ }
+
+ op.pending = true;
+ op.overlapped = .{
+ .raw = std.mem.zeroes(os.windows.OVERLAPPED),
+ .completion = ctx.completion,
+ };
+
+ // Start the recv operation.
+ break :blk switch (os.windows.ws2_32.WSARecv(
+ op.socket,
+ @as([*]os.windows.ws2_32.WSABUF, @ptrCast(&op.buf)),
+ 1, // one buffer
+ &transferred,
+ &flags,
+ &op.overlapped.raw,
+ null,
+ )) {
+ os.windows.ws2_32.SOCKET_ERROR => @as(os.windows.BOOL, os.windows.FALSE),
+ 0 => os.windows.TRUE,
+ else => unreachable,
+ };
+ };
+
+ // Return bytes received on success.
+ if (rc != os.windows.FALSE)
+ return transferred;
+
+ return switch (os.windows.ws2_32.WSAGetLastError()) {
+ .WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE => error.WouldBlock,
+ .WSANOTINITIALISED => unreachable, // WSAStartup() was called
+ .WSA_INVALID_HANDLE => unreachable, // we dont use OVERLAPPED.hEvent
+ .WSA_INVALID_PARAMETER => unreachable, // parameters are fine
+ .WSAECONNABORTED => error.ConnectionRefused,
+ .WSAECONNRESET => error.ConnectionResetByPeer,
+ .WSAEDISCON => unreachable, // we only stream sockets
+ .WSAEFAULT => unreachable, // invalid buffer
+ .WSAEINTR => unreachable, // this is non blocking
+ .WSAEINPROGRESS => unreachable, // this is non blocking
+ .WSAEINVAL => unreachable, // invalid socket type
+ .WSAEMSGSIZE => error.MessageTooBig,
+ .WSAENETDOWN => error.NetworkSubsystemFailed,
+ .WSAENETRESET => error.ConnectionResetByPeer,
+ .WSAENOTCONN => error.SocketNotConnected,
+ .WSAEOPNOTSUPP => unreachable, // we dont use MSG_OOB or MSG_PARTIAL
+ .WSAESHUTDOWN => error.SocketNotConnected,
+ .WSAETIMEDOUT => error.ConnectionRefused,
+ .WSA_OPERATION_ABORTED => unreachable, // operation was cancelled
+ else => |err| os.windows.unexpectedWSAError(err),
+ };
+ }
+ },
+ );
+}
+
+pub fn read(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: ReadError!usize,
+ ) void,
+ completion: *Completion,
+ fd: bun.FileDescriptor,
+ buffer: []u8,
+ offset: ?u64,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .read,
+ .{
+ .fd = fd,
+ .buf = buffer.ptr,
+ .len = @as(u32, @intCast(buffer_limit(buffer.len))),
+ .offset = offset,
+ },
+ struct {
+ fn do_operation(ctx: Completion.Context, op: anytype) ReadError!usize {
+ // Do a synchronous read for now.
+ _ = ctx;
+ if (op.offset) |o| {
+ return os.pread(bun.fdcast(op.fd), op.buf[0..op.len], o) catch |err| switch (err) {
+ error.OperationAborted => unreachable,
+ error.BrokenPipe => unreachable,
+ error.ConnectionTimedOut => unreachable,
+ error.AccessDenied => error.InputOutput,
+ else => |e| e,
+ };
+ } else {
+ return os.read(bun.fdcast(op.fd), op.buf[0..op.len]) catch |err| switch (err) {
+ error.OperationAborted => unreachable,
+ error.BrokenPipe => unreachable,
+ error.ConnectionTimedOut => unreachable,
+ error.AccessDenied => error.InputOutput,
+ else => |e| e,
+ };
+ }
+ }
+ },
+ );
+}
+
+pub fn write(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: WriteError!usize,
+ ) void,
+ completion: *Completion,
+ fd: bun.FileDescriptor,
+ buffer: []const u8,
+ offset: u64,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .write,
+ .{
+ .fd = fd,
+ .buf = buffer.ptr,
+ .len = @as(u32, @intCast(buffer_limit(buffer.len))),
+ .offset = offset,
+ },
+ struct {
+ fn do_operation(ctx: Completion.Context, op: anytype) WriteError!usize {
+ // Do a synchronous write for now.
+ _ = ctx;
+ if (op.offset) |off| {
+ return os.pwrite(bun.fdcast(op.fd), op.buf[0..op.len], off);
+ } else {
+ return os.write(bun.fdcast(op.fd), op.buf[0..op.len]);
+ }
+ }
+ },
+ );
+}
+
+pub fn close(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: CloseError!void,
+ ) void,
+ completion: *Completion,
+ fd: bun.FileDescriptor,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .close,
+ .{ .fd = fd },
+ struct {
+ fn do_operation(ctx: Completion.Context, op: anytype) CloseError!void {
+ _ = ctx;
+
+ // Check if the fd is a SOCKET by seeing if getsockopt() returns ENOTSOCK
+ // https://stackoverflow.com/a/50981652
+ const socket = @as(os.socket_t, @ptrCast(bun.fdcast(op.fd)));
+ getsockoptError(socket) catch |err| switch (err) {
+ error.FileDescriptorNotASocket => return os.windows.CloseHandle(bun.fdcast(op.fd)),
+ else => {},
+ };
+
+ os.closeSocket(socket);
+ }
+ },
+ );
+}
+
+pub const TimeoutError = error{Canceled} || os.UnexpectedError;
+
+pub fn timeout(
+ self: *IO,
+ comptime Context: type,
+ context: Context,
+ comptime callback: fn (
+ context: Context,
+ completion: *Completion,
+ result: TimeoutError!void,
+ ) void,
+ completion: *Completion,
+ nanoseconds: u63,
+) void {
+ self.submit(
+ context,
+ callback,
+ completion,
+ .timeout,
+ .{ .deadline = self.timer.monotonic() + nanoseconds },
+ struct {
+ fn do_operation(ctx: Completion.Context, op: anytype) TimeoutError!void {
+ _ = ctx;
+ _ = op;
+ return;
+ }
+ },
+ );
+}
+
+pub const INVALID_SOCKET = os.windows.ws2_32.INVALID_SOCKET;
+
+/// Creates a socket that can be used for async operations with the IO instance.
+pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !os.socket_t {
+ // SOCK_NONBLOCK | SOCK_CLOEXEC
+ var flags: os.windows.DWORD = 0;
+ flags |= os.windows.ws2_32.WSA_FLAG_OVERLAPPED;
+ flags |= os.windows.ws2_32.WSA_FLAG_NO_HANDLE_INHERIT;
+
+ const socket = try os.windows.WSASocketW(
+ @as(i32, @bitCast(family)),
+ @as(i32, @bitCast(sock_type)),
+ @as(i32, @bitCast(protocol)),
+ null,
+ 0,
+ flags,
+ );
+ errdefer os.closeSocket(socket);
+
+ const socket_iocp = try os.windows.CreateIoCompletionPort(socket, self.iocp, 0, 0);
+ assert(socket_iocp == self.iocp);
+
+ // Ensure that synchronous IO completion doesn't queue an unneeded overlapped
+ // and that the event for the socket (WaitForSingleObject) doesn't need to be set.
+ var mode: os.windows.BYTE = 0;
+ mode |= os.windows.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
+ mode |= os.windows.FILE_SKIP_SET_EVENT_ON_HANDLE;
+
+ const handle = @as(os.windows.HANDLE, @ptrCast(socket));
+ try os.windows.SetFileCompletionNotificationModes(handle, mode);
+
+ return socket;
+}
+
+/// Opens a directory with read only access.
+pub fn open_dir(dir_path: [:0]const u8) !os.fd_t {
+ const dir = try std.fs.cwd().openDirZ(dir_path, .{});
+ return dir.fd;
+}
+
+/// Opens or creates a journal file:
+/// - For reading and writing.
+/// - For Direct I/O (required on windows).
+/// - Obtains an advisory exclusive lock to the file descriptor.
+/// - Allocates the file contiguously on disk if this is supported by the file system.
+/// - Ensures that the file data is durable on disk.
+/// The caller is responsible for ensuring that the parent directory inode is durable.
+/// - Verifies that the file size matches the expected file size before returning.
+pub fn open_file(
+ self: *IO,
+ dir_handle: os.fd_t,
+ relative_path: [:0]const u8,
+ size: u64,
+ must_create: bool,
+) !os.fd_t {
+ _ = size;
+ _ = self;
+
+ assert(relative_path.len > 0);
+
+ const path_w = try os.windows.sliceToPrefixedFileW(relative_path);
+
+ // FILE_CREATE = O_CREAT | O_EXCL
+ var creation_disposition: os.windows.DWORD = 0;
+ if (must_create) {
+ log.info("creating \"{s}\"...", .{relative_path});
+ creation_disposition = os.windows.FILE_CREATE;
+ } else {
+ log.info("opening \"{s}\"...", .{relative_path});
+ creation_disposition = os.windows.OPEN_EXISTING;
+ }
+
+ // O_EXCL
+ var shared_mode: os.windows.DWORD = 0;
+
+ // O_RDWR
+ var access_mask: os.windows.DWORD = 0;
+ access_mask |= os.windows.GENERIC_READ;
+ access_mask |= os.windows.GENERIC_WRITE;
+
+ // O_DIRECT | O_DSYNC
+ var attributes: os.windows.DWORD = 0;
+ // attributes |= os.windows.FILE_FLAG_NO_BUFFERING;
+ // attributes |= os.windows.FILE_FLAG_WRITE_THROUGH;
+
+ // This is critical as we rely on O_DSYNC for fsync() whenever we write to the file:
+ // assert((attributes & os.windows.FILE_FLAG_WRITE_THROUGH) > 0);
+
+ // TODO: Add ReadFileEx/WriteFileEx support.
+ // Not currently needed for O_DIRECT disk IO.
+ attributes |= os.windows.FILE_FLAG_OVERLAPPED;
+
+ const handle = os.windows.kernel32.CreateFileW(
+ path_w.span(),
+ access_mask,
+ shared_mode,
+ null, // no security attributes required
+ creation_disposition,
+ attributes,
+ null, // no existing template file
+ );
+
+ if (handle == os.windows.INVALID_HANDLE_VALUE) {
+ return switch (os.windows.kernel32.GetLastError()) {
+ .ACCESS_DENIED => error.AccessDenied,
+ else => |err| os.windows.unexpectedError(err),
+ };
+ }
+
+ errdefer os.windows.CloseHandle(handle);
+
+ // // Obtain an advisory exclusive lock
+ // // even when we haven't given shared access to other processes.
+ // fs_lock(handle, size) catch |err| switch (err) {
+ // error.WouldBlock => @panic("another process holds the data file lock"),
+ // else => return err,
+ // };
+
+ // // Ask the file system to allocate contiguous sectors for the file (if possible):
+ // if (must_create) {
+ // log.info("allocating {}...", .{std.fmt.fmtIntSizeBin(size)});
+ // fs_allocate(handle, size) catch {
+ // log.warn("file system failed to preallocate the file memory", .{});
+ // log.info("allocating by writing to the last sector of the file instead...", .{});
+
+ // const sector_size = config.sector_size;
+ // const sector: [sector_size]u8 align(sector_size) = [_]u8{0} ** sector_size;
+
+ // // Handle partial writes where the physical sector is less than a logical sector:
+ // const write_offset = size - sector.len;
+ // var written: usize = 0;
+ // while (written < sector.len) {
+ // written += try os.pwrite(handle, sector[written..], write_offset + written);
+ // }
+ // };
+ // }
+
+ // // The best fsync strategy is always to fsync before reading because this prevents us from
+ // // making decisions on data that was never durably written by a previously crashed process.
+ // // We therefore always fsync when we open the path, also to wait for any pending O_DSYNC.
+ // // Thanks to Alex Miller from FoundationDB for diving into our source and pointing this out.
+ // try os.fsync(handle);
+
+ // We cannot fsync the directory handle on Windows.
+ // We have no way to open a directory with write access.
+ //
+ // try os.fsync(dir_handle);
+ _ = dir_handle;
+
+ return handle;
+}
+
+fn fs_lock(handle: bun.FileDescriptor, size: u64) !void {
+ // TODO: Look into using SetFileIoOverlappedRange() for better unbuffered async IO perf
+ // NOTE: Requires SeLockMemoryPrivilege.
+
+ const kernel32 = struct {
+ const LOCKFILE_EXCLUSIVE_LOCK = 0x2;
+ const LOCKFILE_FAIL_IMMEDIATELY = 0o01;
+
+ extern "kernel32" fn LockFileEx(
+ hFile: os.windows.HANDLE,
+ dwFlags: os.windows.DWORD,
+ dwReserved: os.windows.DWORD,
+ nNumberOfBytesToLockLow: os.windows.DWORD,
+ nNumberOfBytesToLockHigh: os.windows.DWORD,
+ lpOverlapped: ?*os.windows.OVERLAPPED,
+ ) callconv(os.windows.WINAPI) os.windows.BOOL;
+ };
+
+ // hEvent = null
+ // Offset & OffsetHigh = 0
+ var lock_overlapped = std.mem.zeroes(os.windows.OVERLAPPED);
+
+ // LOCK_EX | LOCK_NB
+ var lock_flags: os.windows.DWORD = 0;
+ lock_flags |= kernel32.LOCKFILE_EXCLUSIVE_LOCK;
+ lock_flags |= kernel32.LOCKFILE_FAIL_IMMEDIATELY;
+
+ const locked = kernel32.LockFileEx(
+ handle,
+ lock_flags,
+ 0, // reserved param is always zero
+ @as(u32, @truncate(size)), // low bits of size
+ @as(u32, @truncate(size >> 32)), // high bits of size
+ &lock_overlapped,
+ );
+
+ if (locked == os.windows.FALSE) {
+ return switch (os.windows.kernel32.GetLastError()) {
+ .IO_PENDING => error.WouldBlock,
+ else => |err| os.windows.unexpectedError(err),
+ };
+ }
+}
+
+fn fs_allocate(handle: os.fd_t, size: u64) !void {
+ // TODO: Look into using SetFileValidData() instead
+ // NOTE: Requires SE_MANAGE_VOLUME_NAME privilege
+
+ // Move the file pointer to the start + size
+ const seeked = os.windows.kernel32.SetFilePointerEx(
+ handle,
+ @as(i64, @intCast(size)),
+ null, // no reference to new file pointer
+ os.windows.FILE_BEGIN,
+ );
+
+ if (seeked == os.windows.FALSE) {
+ return switch (os.windows.kernel32.GetLastError()) {
+ .INVALID_HANDLE => unreachable,
+ .INVALID_PARAMETER => unreachable,
+ else => |err| os.windows.unexpectedError(err),
+ };
+ }
+
+ // Mark the moved file pointer (start + size) as the physical EOF.
+ const allocated = os.windows.kernel32.SetEndOfFile(handle);
+ if (allocated == os.windows.FALSE) {
+ const err = os.windows.kernel32.GetLastError();
+ return os.windows.unexpectedError(err);
+ }
+}
+
+// TODO: use os.getsockoptError when fixed for windows in stdlib
+fn getsockoptError(socket: os.socket_t) ConnectError!void {
+ var err_code: u32 = undefined;
+ var size: i32 = @sizeOf(u32);
+ const rc = os.windows.ws2_32.getsockopt(
+ socket,
+ os.SOL.SOCKET,
+ os.SO.ERROR,
+ std.mem.asBytes(&err_code),
+ &size,
+ );
+
+ if (rc != 0) {
+ switch (os.windows.ws2_32.WSAGetLastError()) {
+ .WSAENETDOWN => return error.NetworkUnreachable,
+ .WSANOTINITIALISED => unreachable, // WSAStartup() was never called
+ .WSAEFAULT => unreachable, // The address pointed to by optval or optlen is not in a valid part of the process address space.
+ .WSAEINVAL => unreachable, // The level parameter is unknown or invalid
+ .WSAENOPROTOOPT => unreachable, // The option is unknown at the level indicated.
+ .WSAENOTSOCK => return error.FileDescriptorNotASocket,
+ else => |err| return os.windows.unexpectedWSAError(err),
+ }
+ }
+
+ assert(size == 4);
+ if (err_code == 0)
+ return;
+
+ const ws_err = @as(os.windows.ws2_32.WinsockError, @enumFromInt(@as(u16, @intCast(err_code))));
+ return switch (ws_err) {
+ .WSAEACCES => error.PermissionDenied,
+ .WSAEADDRINUSE => error.AddressInUse,
+ .WSAEADDRNOTAVAIL => error.AddressNotAvailable,
+ .WSAEAFNOSUPPORT => error.AddressFamilyNotSupported,
+ .WSAEALREADY => error.ConnectionPending,
+ .WSAEBADF => unreachable,
+ .WSAECONNREFUSED => error.ConnectionRefused,
+ .WSAEFAULT => unreachable,
+ .WSAEISCONN => unreachable, // error.AlreadyConnected,
+ .WSAENETUNREACH => error.NetworkUnreachable,
+ .WSAENOTSOCK => error.FileDescriptorNotASocket,
+ .WSAEPROTOTYPE => unreachable,
+ .WSAETIMEDOUT => error.ConnectionTimedOut,
+ .WSAECONNRESET => error.ConnectionResetByPeer,
+ else => |e| os.windows.unexpectedWSAError(e),
+ };
+}
+
+pub var global: IO = undefined;
+pub var global_loaded: bool = false;
+pub const AcceptError = os.AcceptError || os.SetSockOptError;
+
+pub const ReadError = error{
+ WouldBlock,
+ NotOpenForReading,
+ ConnectionResetByPeer,
+ Alignment,
+ InputOutput,
+ IsDir,
+ SystemResources,
+ Unseekable,
+} || os.UnexpectedError || os.PReadError;