diff options
author | 2022-08-31 22:38:05 -0700 | |
---|---|---|
committer | 2022-08-31 22:38:05 -0700 | |
commit | ca9c87f9d76159a286469a230c84e7c5a2c3ef6b (patch) | |
tree | 888b5b9c081630318e870fcf686368b19458a98a | |
parent | 4528b9938f54ceeb6bf5d90a9266eb2070292623 (diff) | |
parent | a626a07ee86d7b5dd8441002b2c0ba0bf9b68220 (diff) | |
download | bun-ca9c87f9d76159a286469a230c84e7c5a2c3ef6b.tar.gz bun-ca9c87f9d76159a286469a230c84e7c5a2c3ef6b.tar.zst bun-ca9c87f9d76159a286469a230c84e7c5a2c3ef6b.zip |
Merge branch 'jarred/usockets-direction' into jarred/new-http
-rw-r--r-- | .vscode/launch.json | 13 | ||||
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | src/deps/uws.zig | 72 | ||||
-rw-r--r-- | src/http_server.zig | 907 | ||||
-rw-r--r-- | src/io/io_linux.zig | 2 |
5 files changed, 407 insertions, 595 deletions
diff --git a/.vscode/launch.json b/.vscode/launch.json index 932a71dc4..36f98a9bb 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -99,8 +99,17 @@ { "type": "lldb", "request": "launch", - "name": "fetch debug", - "program": "${workspaceFolder}/misctools/fetch", + "name": "toyhttpserver debug", + "program": "${workspaceFolder}/misctools/toyhttpserver", + "args": ["https://example.com", "--verbose", "3001"], + "cwd": "${workspaceFolder}", + "console": "internalConsole" + }, + { + "type": "lldb", + "request": "launch", + "name": "toyhttpserver lite debug", + "program": "${workspaceFolder}/misctools/toyhttpserver-lite", "args": ["https://example.com", "--verbose"], "cwd": "${workspaceFolder}", "console": "internalConsole" @@ -797,23 +797,23 @@ httpbench-release: $(IO_FILES) .PHONY: toyhttpserver-debug toyhttpserver-debug: $(ZIG) build toyhttpserver-obj - $(CXX) $(DEBUG_PACKAGE_DIR)/toyhttpserver.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver $(DEBUG_IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) + $(CXX) $(DEBUG_PACKAGE_DIR)/toyhttpserver.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver $(DEBUG_IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES) .PHONY: toyhttpserver toyhttpserver: $(ZIG) build -Drelease-fast toyhttpserver-obj - $(CXX) $(PACKAGE_DIR)/toyhttpserver.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) + $(CXX) $(PACKAGE_DIR)/toyhttpserver.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES) rm -rf $(PACKAGE_DIR)/toyhttpserver.o .PHONY: toyhttpserver-lite-debug toyhttpserver-lite-debug: $(ZIG) build toyhttpserver-lite-obj - $(CXX) $(DEBUG_PACKAGE_DIR)/toyhttpserver-lite.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver-lite $(DEBUG_IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) + $(CXX) $(DEBUG_PACKAGE_DIR)/toyhttpserver-lite.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver-lite $(DEBUG_IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES) .PHONY: toyhttpserver-lite toyhttpserver-lite: $(ZIG) build -Drelease-fast toyhttpserver-lite-obj - $(CXX) $(PACKAGE_DIR)/toyhttpserver-lite.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver-lite $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES) + $(CXX) $(PACKAGE_DIR)/toyhttpserver-lite.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver-lite $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES) rm -rf $(PACKAGE_DIR)/toyhttpserver-lite.o .PHONY: check-glibc-version-dependency diff --git a/src/deps/uws.zig b/src/deps/uws.zig index c545de2df..ea033f546 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -17,6 +17,10 @@ pub fn NewSocketHandler(comptime ssl: bool) type { socket: *Socket, const ThisSocket = @This(); + pub fn handle(this: ThisSocket) ?*anyopaque { + return us_socket_get_native_handle(comptime ssl_int, this.socket); + } + pub fn isEstablished(this: ThisSocket) bool { return us_socket_is_established(comptime ssl_int, this.socket) > 0; } @@ -36,7 +40,7 @@ pub fn NewSocketHandler(comptime ssl: bool) type { return us_socket_context( comptime ssl_int, this.socket, - ); + ).?; } pub fn flush(this: ThisSocket) void { return us_socket_flush( @@ -101,6 +105,56 @@ pub fn NewSocketHandler(comptime ssl: bool) type { ); } + pub fn attach(fd: c_int, ctx: *us_socket_context_t) ?ThisSocket { + if (us_socket_attach(comptime ssl_int, fd, ctx)) |socket| { + return ThisSocket{ .socket = socket }; + } else { + return null; + } + } + + pub fn detach(this: ThisSocket) c_int { + const handle_ = this.handle().?; + var socket_ = this.socket; + if (us_socket_detach(comptime ssl_int, socket_) == null) + return -1; + + return @intCast(c_int, @ptrToInt(handle_)); + } + + pub fn listen( + host: []const u8, + port: c_int, + socket_ctx: *us_socket_context_t, + comptime Context: type, + ctx: Context, + comptime socket_field_name: []const u8, + ) ?*Context { + // var stack_fallback = std.heap.stackFallback(1024, bun.default_allocator); + // var allocator = stack_fallback.get(); + // var host_ = allocator.dupeZ(u8, host) catch return null; + // defer allocator.free(host_); + _ = host; + + var socket = us_socket_context_listen( + comptime ssl_int, + socket_ctx, + null, + port, + 0, + @sizeOf(Context), + ) orelse return null; + const socket_ = ThisSocket{ .socket = @ptrCast(*Socket, socket) }; + var holder = socket_.ext(Context) orelse { + if (comptime bun.Environment.allow_assert) unreachable; + _ = us_socket_close_connecting(comptime ssl_int, socket_.socket); + return null; + }; + holder.* = ctx; + @field(holder.*, socket_field_name) = socket; + return holder; + } + pub fn connect( host: []const u8, port: c_int, @@ -316,7 +370,7 @@ extern fn us_socket_context_add_server_name(ssl: c_int, context: ?*us_socket_con extern fn us_socket_context_remove_server_name(ssl: c_int, context: ?*us_socket_context_t, hostname_pattern: [*c]const u8) void; extern fn us_socket_context_on_server_name(ssl: c_int, context: ?*us_socket_context_t, cb: ?fn (?*us_socket_context_t, [*c]const u8) callconv(.C) void) void; extern fn us_socket_context_get_native_handle(ssl: c_int, context: ?*us_socket_context_t) ?*anyopaque; -extern fn us_create_socket_context(ssl: c_int, loop: ?*Loop, ext_size: c_int, options: us_socket_context_options_t) ?*us_socket_context_t; +pub extern fn us_create_socket_context(ssl: c_int, loop: ?*Loop, ext_size: c_int, options: us_socket_context_options_t) ?*us_socket_context_t; extern fn us_socket_context_free(ssl: c_int, context: ?*us_socket_context_t) void; extern fn us_socket_context_on_open(ssl: c_int, context: ?*us_socket_context_t, on_open: fn (*Socket, c_int, [*c]u8, c_int) callconv(.C) ?*Socket) void; extern fn us_socket_context_on_close(ssl: c_int, context: ?*us_socket_context_t, on_close: fn (*Socket, c_int, ?*anyopaque) callconv(.C) ?*Socket) void; @@ -325,7 +379,7 @@ extern fn us_socket_context_on_writable(ssl: c_int, context: ?*us_socket_context extern fn us_socket_context_on_timeout(ssl: c_int, context: ?*us_socket_context_t, on_timeout: fn (*Socket) callconv(.C) ?*Socket) void; extern fn us_socket_context_on_connect_error(ssl: c_int, context: ?*us_socket_context_t, on_connect_error: fn (*Socket, c_int) callconv(.C) ?*Socket) void; extern fn us_socket_context_on_end(ssl: c_int, context: ?*us_socket_context_t, on_end: fn (*Socket) callconv(.C) ?*Socket) void; -extern fn us_socket_context_ext(ssl: c_int, context: ?*us_socket_context_t) ?*anyopaque; +pub extern fn us_socket_context_ext(ssl: c_int, context: ?*us_socket_context_t) ?*anyopaque; extern fn us_socket_context_listen(ssl: c_int, context: ?*us_socket_context_t, host: [*c]const u8, port: c_int, options: c_int, socket_ext_size: c_int) ?*listen_socket_t; @@ -391,8 +445,8 @@ pub const Poll = opaque { pub const write_flag = if (Environment.isLinux) std.os.linux.EPOLL.OUT else 2; }; - pub fn deinit(self: *Poll) void { - us_poll_free(self); + pub fn deinit(self: *Poll, loop: *Loop) void { + us_poll_free(self, loop); } // (void* userData, int fd, int events, int error, struct us_poll_t *poll) @@ -503,7 +557,11 @@ pub const Request = opaque { extern fn uws_req_get_parameter(res: *Request, index: c_ushort, dest: *[*]const u8) usize; }; -const listen_socket_t = opaque {}; +pub const listen_socket_t = opaque { + pub fn close(this: *listen_socket_t, comptime ssl: bool) void { + us_listen_socket_close(comptime @as(c_int, @boolToInt(ssl)), this); + } +}; extern fn us_listen_socket_close(ssl: c_int, ls: *listen_socket_t) void; pub fn NewApp(comptime ssl: bool) type { @@ -1148,3 +1206,5 @@ pub const uws_app_listen_config_t = extern struct { }; extern fn us_socket_mark_needs_more_not_ssl(socket: ?*uws_res) void; +extern fn us_socket_detach(ssl: c_int, socket: *Socket) ?*Socket; +extern fn us_socket_attach(ssl: c_int, client_fd: c_int, ctx: *us_socket_context_t) ?*Socket; diff --git a/src/http_server.zig b/src/http_server.zig index 302712df3..31cdb0377 100644 --- a/src/http_server.zig +++ b/src/http_server.zig @@ -19,6 +19,8 @@ const StringPointer = @import("./api/schema.zig").Api.StringPointer; const StringBuilder = @import("./string_builder.zig"); const Lock = @import("./lock.zig").Lock; const log = Output.scoped(.HTTPServer, false); +const uWS = @import("uws"); +const adjustUlimit = @import("./fs.zig").FileSystem.RealFS.adjustUlimit; const ServerConfig = struct { port: u16 = 3001, @@ -33,7 +35,6 @@ pub const constants = struct { const FallbackBufferPool = ObjectPool([16384]u8, null, false, 1024); -const SocketList = HiveArray(Socket, constants.SOCKET_BACKLOG); const IncomingRequest = struct { http_request: HTTPRequest, body_chunk: []const u8 = "", @@ -41,20 +42,18 @@ const IncomingRequest = struct { bytes: []u8, pub fn freeData(this: *IncomingRequest, allocator: std.mem.Allocator) void { - _ = this; - _ = allocator; - // if (this.bytes.len > 0) - // allocator.free(this.bytes); - // this.bytes.len = 0; - // this.bytes.ptr = undefined; - // this.body_chunk = ""; - // if (this.http_request.headers.len > 0) - // allocator.free(this.http_request.headers); - // this.http_request.headers.len = 0; - // this.http_request.headers.ptr = undefined; - } - - pub fn create(allocator: std.mem.Allocator, request_recv: []u8, fd: fd_t, request: HTTPRequest) !IncomingRequest { + if (this.bytes.len > 0) + allocator.free(this.bytes); + this.bytes.len = 0; + this.bytes.ptr = undefined; + this.body_chunk = ""; + if (this.http_request.headers.len > 0) + allocator.free(this.http_request.headers); + this.http_request.headers.len = 0; + this.http_request.headers.ptr = undefined; + } + + pub fn create(allocator: std.mem.Allocator, request_recv: []const u8, fd: fd_t, request: HTTPRequest) !IncomingRequest { var body_chunk = request_recv[@minimum(request.bytes_read, request_recv.len)..]; var string_builder = StringBuilder{}; @@ -74,36 +73,9 @@ const IncomingRequest = struct { const fd_t = std.os.fd_t; -const Data = struct { - value: Value = Value{ .empty = void{} }, - len: u16 = 0, - - pub const Value = union(enum) { - recv_buffer: *RecvBuffer, - fallback_buffer: *FallbackBufferPool.Node, - empty: void, - }; - - pub fn read(this: Data) []u8 { - return switch (this.value) { - .recv_buffer => this.value.recv_buffer[0..this.len], - .fallback_buffer => this.value.fallback_buffer.data[0..this.len], - .empty => &.{}, - }; - } - - pub fn writable(this: Data) []u8 { - return switch (this.value) { - .recv_buffer => this.value.recv_buffer[this.len..], - .fallback_buffer => this.value.fallback_buffer.data[this.len..], - .empty => &.{}, - }; - } -}; - pub const RequestHandler = struct { ctx: *anyopaque, - onRequest: fn (ctx: *anyopaque, incoming: IncomingRequest) bool, + onRequest: fn (ctx: *anyopaque, conn: *Connection, incoming: IncomingRequest) bool, pub fn New(comptime HandlerType: type, comptime Function: anytype) type { return struct { @@ -114,214 +86,35 @@ pub const RequestHandler = struct { }; } - pub fn onRequest(ctx: *anyopaque, incoming: IncomingRequest) bool { + pub fn onRequest(ctx: *anyopaque, conn: *Connection, incoming: IncomingRequest) bool { if (@typeInfo(@TypeOf(Function)).Fn.return_type.? == void) { - Function(@ptrCast(*HandlerType, @alignCast(@alignOf(HandlerType), ctx)), incoming); + Function(@ptrCast(*HandlerType, @alignCast(@alignOf(HandlerType), ctx)), conn, incoming); return true; } - return Function(@ptrCast(*HandlerType, @alignCast(@alignOf(HandlerType), ctx)), incoming); + return Function(@ptrCast(*HandlerType, @alignCast(@alignOf(HandlerType), ctx)), conn, incoming); } }; } }; -const recv_buffer_len = 4096; -const RecvBuffer = [recv_buffer_len]u8; -const RecvHiveArray = HiveArray(RecvBuffer, 128); - -pub fn sendStaticMessageConcurrent(toy: *ToyHTTPServer, fd: fd_t, message: []const u8) void { - // const CompletionPoolBackup = ObjectPool(AsyncIO.Completion, null, false, 512); - - const doSendError = struct { - pub fn send( - this: *ToyHTTPServer, - completion: *AsyncIO.Completion, - result: AsyncIO.SendError!usize, - ) void { - // defer @fieldParentPtr(CompletionPoolBackup.Node, "data", completion).release(); - - const amt = result catch |err| { - if (err != error.EBADF) - sendClose(completion.operation.send.socket); - return; - }; - if (completion.operation.send.disconnected) { - sendClose(completion.operation.send.socket); - return; - } - const remain = completion.operation.send.buf[0..completion.operation.send.len][amt..]; - if (remain.len == 0) { - this.server.takeAsync(completion.operation.send.socket); - return; - } - - this.io.sendNow(*ToyHTTPServer, this, send, CompletionPool.get(), completion.operation.send.socket, remain, 0); - } - }.send; - toy.io.sendNow(*ToyHTTPServer, toy, doSendError, CompletionPool.get(), fd, message, 0); -} - -const CompletionPool = struct { - pub fn get() *AsyncIO.Completion { - return bun.default_allocator.create(AsyncIO.Completion) catch unreachable; - } -}; - -pub fn sendStaticMessage(server: *Server, fd: fd_t, message: []const u8) void { - const doSendError = struct { - pub fn send( - this: *Server, - completion: *AsyncIO.Completion, - result: AsyncIO.SendError!usize, - ) void { - var amt = result catch { - sendClose(completion.operation.send.socket); - return; - }; - const remain = completion.operation.send.buf[0..completion.operation.send.len][amt..]; - if (completion.operation.send.disconnected) { - sendClose(completion.operation.send.socket); - return; - } - if (remain.len == 0) { - this.take(completion.operation.send.socket); - return; - } - - AsyncIO.global.send(*Server, this, send, CompletionPool.get(), completion.operation.send.socket, remain, 0); - } - }.send; - AsyncIO.global.send(*Server, server, doSendError, CompletionPool.get(), fd, message, 0); -} - -pub fn sendStaticMessageWithoutClosing(server: *Server, fd: fd_t, message: []const u8) void { - const doSendError = struct { - pub fn send( - this: *Server, - completion: *AsyncIO.Completion, - result: AsyncIO.SendError!usize, - ) void { - var amt = result catch { - sendClose(completion.operation.send.socket); - return; - }; - if (completion.operation.send.disconnected) { - sendClose(completion.operation.send.socket); - return; - } - - const remain = completion.operation.send.buf[0..completion.operation.send.len][amt..]; - if (remain.len == 0) { - this.take(completion.operation.send.socket); - return; - } - - AsyncIO.global.send(*Server, this, send, CompletionPool.get(), completion.operation.send.socket, remain, 0); - } - }.send; - AsyncIO.global.send(*Server, server, doSendError, CompletionPool.get(), fd, message, 0); -} - pub const Server = struct { - recv_buffer: RecvHiveArray = RecvHiveArray.init(), - listener: fd_t, - accept_completion: AsyncIO.Completion = undefined, + listener: *uWS.listen_socket_t, + ctx: *uWS.us_socket_context_t, status: Status = Status.open, - sockets: SocketList = SocketList.init(), handler: RequestHandler, - shutdown_completion: AsyncIO.Completion = undefined, shutdown_requested: bool = false, - - pending_sockets_to_return: PendingSocketsList = PendingSocketsList.init(), - pending_sockets_to_return_lock: Lock = Lock.init(), - pending_socket_return_task: NetworkThread.Task = .{ .callback = flushPendingSocketsToReturn }, - pending_sockets_to_return_scheduled: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), - - after_callback: AsyncIO.Callback = .{ - .callback = enqueueAcceptOpaque, - .ctx = undefined, - }, - - has_pending_accept: bool = false, - - pub fn flushPendingSocketsToReturn(task: *NetworkThread.Task) void { - var server: *Server = @fieldParentPtr(Server, "pending_socket_return_task", task); - server.pending_sockets_to_return_scheduled.store(0, .Monotonic); - server.pending_sockets_to_return_lock.lock(); - var sockets_slice = server.pending_sockets_to_return.readableSlice(0); - var stack_fallback = std.heap.stackFallback(4096, bun.default_allocator); - var allocator = stack_fallback.get(); - var list = allocator.dupe(u32, sockets_slice) catch unreachable; - server.pending_sockets_to_return.head = 0; - server.pending_sockets_to_return.count = 0; - server.pending_sockets_to_return_lock.unlock(); - - defer { - if (!stack_fallback.fixed_buffer_allocator.ownsSlice(std.mem.sliceAsBytes(list))) { - allocator.free(list); - } - } - - for (list) |fd| { - if (comptime Environment.isMac) { - std.os.getsockoptError(@intCast(c_int, fd)) catch { - sendClose(@intCast(c_int, fd)); - continue; - }; - } - - if (server.sockets.get()) |socket| { - socket.* = .{ - .fd = @intCast(fd_t, fd), - .server_ = server, - }; - - socket.enqueueRecv() catch unreachable; - } else { - sendClose(@intCast(fd_t, fd)); - } - } - } + loop: *uWS.Loop = undefined, const PendingSocketsList = std.fifo.LinearFifo(u32, .{ .Static = constants.SOCKET_BACKLOG }); - pub fn takeAsync(this: *Server, socket: fd_t) void { - - // var err_code: i32 = undefined; - // var size: u32 = @sizeOf(u32); - // _ = std.c.getsockopt(@intCast(fd_t, socket), std.os.SOL.SOCKET, AsyncIO.darwin.SO_NREAD, @ptrCast([*]u8, &err_code), &size); - // if (err_code == 0) { - // std.os.shutdown(socket, .both) catch {}; - // std.os.close(socket); - // } - // return; - // } - - this.pending_sockets_to_return_lock.lock(); - { - this.pending_sockets_to_return.writeItemAssumeCapacity(@intCast(u32, socket)); - } - if (this.pending_sockets_to_return_scheduled.fetchAdd(1, .Monotonic) == 0) - NetworkThread.global.schedule(NetworkThread.Batch.from(&this.pending_socket_return_task)); - this.pending_sockets_to_return_lock.unlock(); - } - - pub fn take(server: *Server, fd: fd_t) void { - if (server.sockets.get()) |socket| { - socket.* = .{ - .fd = @intCast(fd_t, fd), - .server_ = server, - }; - - socket.enqueueRecv() catch unreachable; - } else { - sendClose(@intCast(fd_t, fd)); - } - } - pub fn quiet(this: *Server) void { + if (this.status != .open) + return; + this.status = .closing; + this.listener.close(false); + this.status = .closed; } pub const Status = enum { @@ -336,23 +129,6 @@ pub const Server = struct { this.shutdown_requested = true; log("shutdown"); this.quiet(); - AsyncIO.global.nextTick(*Server, this, doShutdown, &this.shutdown_completion); - } - - pub fn doShutdown(this: *Server, _: *AsyncIO.Completion, _: void) void { - log("doShutdown"); - this.status = Status.closed; - var iter = this.sockets.available.iterator(.{ - .kind = .unset, - }); - - while (iter.next()) |id| { - this.sockets.buffer[id].reset(); - this.sockets.buffer[id].closeWithoutReset(); - } - - this.sockets = SocketList.init(); - this.recv_buffer = RecvHiveArray.init(); } pub fn boot() void {} @@ -361,138 +137,57 @@ pub const Server = struct { log("start port: {d}", .{config.port}); var server = try bun.default_allocator.create(Server); + var ctx = server.createContext() orelse return error.OutOfMemory; + uWS.SocketTCP.configure( + ctx, + Connection, + Connection.onOpen, + Connection.onClose, + Connection.onData, + Connection.onWritable, + Connection.onTimeout, + Connection.onConnectError, + Connection.onEnd, + ); + server.* = .{ - .listener = brk: { - if (comptime Environment.isMac) { - break :brk AsyncIO.createListenSocket(config.host, config.port, config.reuse_port); - } else { - break :brk try AsyncIO.openSocket(std.os.AF.INET, constants.OPEN_SOCKET_FLAGS | std.os.SOCK.STREAM, std.os.IPPROTO.TCP); - } - }, + .listener = undefined, + .ctx = ctx, .handler = handler, + .status = .open, + .loop = uWS.Loop.get().?, }; - server.after_callback.ctx = server; - { - // var listener: std.x.net.tcp.Listener = .{ - // .socket = .{ - // .fd = server.listener, - // }, - // }; - server.enqueueAccept(); - // listener.setKeepAlive(false) catch {}; - // try listener.bind(std.x.net.ip.Address.initIPv4(std.x.os.IPv4.unspecified, config.port)); - // try listener.listen(constants.SOCKET_BACKLOG); + if (uWS.SocketTCP.listen(config.host, config.port, ctx, *Server, server, "listener") == null) { + return error.ListenFailed; } - // try AsyncIO.global.on_after.append(bun.default_allocator, server.after_callback); - return server; - } - - pub fn enqueueAcceptOpaque(server: *anyopaque) void { - enqueueAccept(@ptrCast(*Server, @alignCast(@alignOf(Server), server))); - } - - pub fn enqueueAccept(server: *Server) void { - AsyncIO.global.acceptNow(*Server, server, onAccept, &server.accept_completion, server.listener); - } - - pub fn onAccept( - this: *Server, - complete: *AsyncIO.Completion, - result_: AsyncIO.AcceptError!std.os.socket_t, - ) void { - var remain: usize = @as(usize, complete.operation.accept.backlog); - - var fd = result_ catch |err| { - log("onAccept error: {s}", .{@errorName(err)}); - return; + server.* = .{ + .listener = server.listener, + .ctx = ctx, + .handler = handler, + .status = .open, + .loop = uWS.Loop.get().?, }; - - _ = AsyncIO.Syscall.fcntl(fd, std.os.F.SETFL, (AsyncIO.Syscall.fcntl(fd, std.os.F.GETFL, 0) catch 0) | std.os.O.NONBLOCK | std.os.O.CLOEXEC) catch 0; - - if (this.handleAccept(fd)) { - if (comptime Environment.isMac) { - while (remain > 0) : (remain -= 1) { - const sockfd = AsyncIO.darwin.@"accept$NOCANCEL"(this.listener, null, null); - if (sockfd < 0) { - break; - } - fd = sockfd; - - AsyncIO.Syscall.setsockopt(fd, std.os.SOL.SOCKET, std.os.SO.NOSIGPIPE, &std.mem.toBytes(@as(c_int, 1))) catch {}; - _ = AsyncIO.Syscall.fcntl(fd, std.os.F.SETFL, (AsyncIO.Syscall.fcntl(fd, std.os.F.GETFL, 0) catch 0) | std.os.O.NONBLOCK | std.os.O.CLOEXEC) catch 0; - // _ = AsyncIO.Syscall.fcntl(fd, std.os.FD_CLOEXEC, 1) catch 0; - - if (!this.handleAccept(fd)) - break; - } - } - } + return server; } - - fn handleAccept(this: *Server, fd: std.os.socket_t) bool { - if (this.status == .closing or this.status == .closed) { - log("onAccept closing fd: {d} because not accepting connections", .{fd}); - std.os.close(fd); - return false; - } - - var socket = this.sockets.get() orelse { - log("onAccept closing fd: {d} because no sockets available", .{fd}); - std.os.close(fd); - return false; - }; - - socket.* = .{ - .fd = fd, - .server_ = this, - }; - - socket.enqueueRecv() catch { - log("onAccept closing fd: {d} because enqueueRecv failed", .{fd}); - std.os.close(fd); - std.debug.assert(this.sockets.put(socket)); - }; - return true; + pub fn createContext(server: *Server) ?*uWS.us_socket_context_t { + var loop = uWS.Loop.get().?; + var ctx = uWS.us_create_socket_context(0, loop, @sizeOf(*Server), .{}) orelse return null; + var ptr = @ptrCast(**Server, @alignCast(@alignOf(*Server), uWS.us_socket_context_ext(0, ctx).?)); + ptr.* = server; + return ctx; } - pub fn dispatch(this: *Server, socket: *Socket, request: HTTPRequest) void { - var incoming_request = IncomingRequest.create(bun.default_allocator, socket.data.read(), socket.fd, request) catch { - log("Dropping request due to OOM!", .{}); - socket.reset(); - return; - }; - - // Reset the data before calling the handler to free up memory for the next request. - socket.reset(); - std.debug.assert(this.sockets.put(socket)); - - if (!this.handler.onRequest(this.handler.ctx, incoming_request)) { - log("Dropping request due to handler failure!", .{}); + pub fn dispatch(this: *Server, connection: *Connection, incoming_request: IncomingRequest) void { + if (this.handler.onRequest(this.handler.ctx, connection, incoming_request)) { return; } + _ = connection.socket.write(bad_request, false); + connection.socket.close(0, null); } }; -fn sendClose(fd: fd_t) void { - std.os.getsockoptError(fd) catch {}; - std.os.shutdown(fd, std.os.ShutdownHow.both) catch {}; - - if (comptime Environment.isLinux) { - const Closer = struct { - pub fn onClose(_: void, completion: *AsyncIO.Completion, _: AsyncIO.CloseError!void) void { - var node = @fieldParentPtr(CompletionPool.Node, "data", completion); - node.releaase(); - } - }; - - AsyncIO.global.close(void, void{}, Closer.onClose, CompletionPool.get(), fd); - } else { - std.os.close(fd); - } -} - const CompletionSwapper = struct { first: AsyncIO.Completion = undefined, second: AsyncIO.Completion = undefined, @@ -549,194 +244,96 @@ const hello_world = "HTTP/1.1 200 OK" ++ CRLF ++ CRLF ++ "Hello, world!"; -pub const Socket = struct { - recv_completion: CompletionSwapper = CompletionSwapper{}, - fd: fd_t, - data: Data = .{}, - server_: *Server, - - pub fn reset(this: *Socket) void { - switch (this.data.value) { - .recv_buffer => |buf| { - std.debug.assert(this.server().recv_buffer.put(buf)); - this.data = .{ .value = .{ .empty = void{} } }; - }, - .fallback_buffer => |buf| { - buf.release(); - this.data = .{ .value = .{ .empty = void{} } }; - }, - .empty => {}, - } - this.recv_completion = CompletionSwapper{}; - } - - pub fn consume(this: *Socket, buf: []u8) !void { - var writable = this.data.writable(); - if (buf.ptr == writable.ptr and writable.len >= buf.len) { - this.data.len += @truncate(u16, buf.len); - return; - } else if (writable.len >= buf.len) { - @memcpy(writable.ptr, buf.ptr, buf.len); - this.data.len += @truncate(u16, buf.len); - return; - } - const start_len = this.data.len; - - switch (this.data.value) { - .recv_buffer => |recv| { - var fallback = FallbackBufferPool.get(bun.default_allocator); - @memcpy(&fallback.data, recv, start_len); - std.debug.assert(this.server().recv_buffer.put(recv)); - @memcpy(fallback.data[start_len..].ptr, buf.ptr, buf.len); - this.data = .{ .value = .{ .fallback_buffer = fallback }, .len = @truncate(u16, buf.len + start_len) }; - }, - .fallback_buffer => { - return error.TooBig; - }, - .empty => { - if (buf.len <= recv_buffer_len) { - if (this.server().recv_buffer.get()) |recv| { - @memcpy(recv, buf.ptr, buf.len); - this.data = .{ .value = .{ .recv_buffer = recv }, .len = @truncate(u16, buf.len) }; - return; - } - } - - if (buf.len <= 16384) { - var fallback = FallbackBufferPool.get(bun.default_allocator); - @memcpy(&fallback.data, buf.ptr, buf.len); - this.data = .{ .value = .{ .fallback_buffer = fallback }, .len = @truncate(u16, buf.len) }; - } - - return error.TooBig; - }, - } - } - - pub fn cancelTimeout(this: *Socket) void { - _ = this; - } - - fn getNextBuffer(this: *Socket) []u8 { - var next_buffer: []u8 = this.data.writable(); - - if (next_buffer.len < 512) { - var buf = this.data.read(); - if (buf.len == 0) { - if (this.server().recv_buffer.get()) |recv| { - this.data = .{ .value = .{ .recv_buffer = recv }, .len = @truncate(u16, buf.len) }; - return this.data.writable(); - } - } - - if (this.data.value == .recv_buffer) { - var fallback = FallbackBufferPool.get(bun.default_allocator); - @memcpy(&fallback.data, buf.ptr, buf.len); - this.data = .{ .value = .{ .fallback_buffer = fallback }, .len = @truncate(u16, buf.len) }; - return this.data.writable(); - } +pub const Connection = struct { + socket: uWS.SocketTCP, + incoming_request: IncomingRequest = undefined, + is_writable: bool = false, + has_received: bool = false, + has_incoming_request: bool = false, - if (this.data.value == .empty) { - var fallback = FallbackBufferPool.get(bun.default_allocator); - this.data = .{ .value = .{ .fallback_buffer = fallback }, .len = 0 }; - return this.data.writable(); - } - } - - return next_buffer; + pub fn onOpen(this: *Connection, socket: uWS.SocketTCP) void { + this.socket = socket; + socket.timeout(30); + this.is_writable = false; + log("Client connected", .{}); } - pub fn enqueueRecv(this: *Socket) !void { - this.setTimeout(); - - var next_buffer = this.getNextBuffer(); - if (next_buffer.len == 0) { - return error.TooBig; - } - - AsyncIO.global.recv( - *Socket, - this, - Socket.onRecv, - CompletionPool.get(), - this.fd, - next_buffer, - ); + fn dispatch(this: *Connection, incoming_request: IncomingRequest) void { + this.has_received = false; + this.is_writable = false; + this.server().dispatch(this, incoming_request); + return; } - pub fn close(this: *Socket) void { - this.reset(); + pub fn onClose(this: *Connection, socket: uWS.SocketTCP, _: c_int, _: ?*anyopaque) void { + _ = this; + _ = socket; - this.closeWithoutReset(); - std.debug.assert(this.server().sockets.put(this)); + log("Client disconnected", .{}); } - pub fn closeWithoutReset(this: *Socket) void { - const fd = this.fd; - std.debug.assert(fd > 0); - this.fd = 0; + pub fn onWritable(this: *Connection, socket: uWS.SocketTCP) void { + _ = this; + _ = socket; - sendClose(fd); + this.is_writable = true; } - pub fn onRecv( - this: *Socket, - completion: *AsyncIO.Completion, - read_: AsyncIO.RecvError!usize, - ) void { - const read = read_ catch |err| { - log("onRecv error: {s}", .{@errorName(err)}); - this.close(); - return; - }; - - if (read == 0) { - log("onRecv disconnected socket", .{}); - this.close(); - return; - } - - this.consume(completion.operation.recv.buf[0..read]) catch |err| { - switch (err) { - error.TooBig => { - log("onRecv TooBig", .{}); - this.reset(); - sendStaticMessage(this.server(), this.fd, request_header_fields_too_large); - - return; - }, - } - }; + pub fn onData(this: *Connection, socket: uWS.SocketTCP, data: []const u8) void { + _ = this; + _ = socket; + _ = data; + socket.timeout(30); var headers: [512]picohttp.Header = undefined; - const request = HTTPRequest.parse(this.data.read(), &headers) catch |err| { + const request = HTTPRequest.parse(data, &headers) catch |err| { switch (err) { error.BadRequest => { log("onRecv bad request", .{}); - this.reset(); - sendStaticMessage(this.server(), this.fd, bad_request); - + this.socket.close(0, null); return; }, error.ShortRead => { - this.enqueueRecv() catch { - log("onRecv TooBig (on enqueue)", .{}); - this.reset(); - sendStaticMessage(this.server(), this.fd, request_header_fields_too_large); - }; return; }, } }; - log("onRecv request: {any}", .{request}); - this.cancelTimeout(); - this.server().dispatch(this, request); + + const fd = @intCast(fd_t, @ptrToInt(socket.handle().?)); + // if (this.has_incoming_request) { + // this.incoming_request.freeData(bun.default_allocator); + // } + this.has_received = true; + this.has_incoming_request = true; + this.dispatch(IncomingRequest.create(bun.default_allocator, data, fd, request) catch { + log("Dropping request due to OOM!", .{}); + this.socket.close(0, null); + return; + }); + } + + pub fn onTimeout(this: *Connection, socket: uWS.SocketTCP) void { + _ = this; + _ = socket; + socket.close(0, null); + } + + pub fn onConnectError(this: *Connection, socket: uWS.SocketTCP, code: c_int) void { + _ = this; + _ = socket; + _ = code; } - pub fn setTimeout(_: *Socket) void {} + pub fn onEnd(this: *Connection, socket: uWS.SocketTCP) void { + _ = this; + _ = socket; + + socket.shutdown(); + socket.close(0, null); + } - pub fn server(this: *Socket) *Server { - return this.server_; + pub inline fn server(this: Connection) *Server { + return @ptrCast(**Server, @alignCast(@alignOf(*Server), uWS.us_socket_context_ext(0, this.socket.context())).?).*; } }; @@ -745,128 +342,274 @@ const NetworkThread = @import("./network_thread.zig"); pub const ToySingleThreadedHTTPServer = struct { pub const Handler = RequestHandler.New(ToySingleThreadedHTTPServer, onRequest); server: *Server, - task: NetworkThread.Task = .{ .callback = startServer }, pub fn onRequest( this: *ToySingleThreadedHTTPServer, - incoming: IncomingRequest, - ) void { - log("onRequest: {any}", .{incoming}); - sendStaticMessageWithoutClosing(this.server, incoming.fd, hello_world); - var inc = incoming; - inc.freeData(bun.default_allocator); - } - - pub fn drain(_: *ToySingleThreadedHTTPServer) void {} - - pub fn loop(this: *ToySingleThreadedHTTPServer) void { - this.drain(); + connection: *Connection, + _: IncomingRequest, + ) bool { + _ = this; - while (true) { - AsyncIO.global.wait(this, drain); + const wrote = connection.socket.write(hello_world, true); + if (wrote < hello_world.len) { + log("onRequest: write failed", .{}); + connection.socket.close(0, null); + return false; } + + // incoming.freeData(bun.default_allocator); + return true; } - pub fn startServer(task: *NetworkThread.Task) void { + pub fn startServer(toy: *ToySingleThreadedHTTPServer) void { var toy_config = ServerConfig{ .port = std.fmt.parseInt(u16, std.os.getenv("PORT") orelse "3001", 10) catch 3001, }; defer Output.prettyln("Server started on port {d}", .{toy_config.port}); defer Output.flush(); - var toy = @fieldParentPtr(ToySingleThreadedHTTPServer, "task", task); toy.server = Server.start(toy_config, RequestHandler.New(ToySingleThreadedHTTPServer, onRequest).init(toy)) catch unreachable; + toy.server.loop.run(); } pub fn main() anyerror!void { var http = try bun.default_allocator.create(ToySingleThreadedHTTPServer); + http.* = .{ .server = undefined }; var stdout_ = std.io.getStdOut(); var stderr_ = std.io.getStdErr(); var output_source = Output.Source.init(stdout_, stderr_); Output.Source.set(&output_source); + _ = try adjustUlimit(); defer Output.flush(); - try NetworkThread.init(); - http.* = .{ - .server = undefined, - }; - NetworkThread.global.schedule(NetworkThread.Batch.from(&http.task)); - while (true) { - std.time.sleep(std.time.ns_per_hour); - } + startServer(http); } }; pub const ToyHTTPServer = struct { pub const Handler = RequestHandler.New(*ToyHTTPServer, onRequest); - const Fifo = std.fifo.LinearFifo(IncomingRequest, .Dynamic); + const Fifo = std.fifo.LinearFifo(IncomingRequest, .{ .Static = 4096 }); server: *Server, - pending: Fifo, + first_list: std.BoundedArray(IncomingRequest, 8096) = std.BoundedArray(IncomingRequest, 8096).init(0) catch unreachable, + second_list: std.BoundedArray(IncomingRequest, 8096) = std.BoundedArray(IncomingRequest, 8096).init(0) catch unreachable, + first_is_active: bool = true, + incoming_list: *std.BoundedArray(IncomingRequest, 8096) = undefined, + outgoing_list: std.atomic.Atomic(*std.BoundedArray(IncomingRequest, 8096)) = undefined, active: Fifo, lock: Lock = Lock.init(), - io: AsyncIO, - task: NetworkThread.Task = .{ .callback = startServer }, + loop: *uWS.Loop, + has_scheduled: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), + ctx: *uWS.us_socket_context_t = undefined, + waker: AsyncIO.Waker = undefined, + // active_requests: HiveArray(WritableSocket, 1024) = HiveArray(WritableSocket, 1024).init(), pub fn onRequest( this: *ToyHTTPServer, + _: *Connection, incoming: IncomingRequest, - ) void { + ) bool { { this.lock.lock(); - this.pending.writeItem(incoming) catch unreachable; defer this.lock.unlock(); + this.outgoing_list.loadUnchecked().appendAssumeCapacity(incoming); } - this.io.waker.wake() catch unreachable; + + if (this.outgoing_list.loadUnchecked().len > 20) + this.waker.wake() catch unreachable; + + return true; } pub fn drain(this: *ToyHTTPServer) void { - const all = this.pending.readableSlice(0); - this.active.write(all) catch unreachable; - this.pending.count = 0; - this.pending.head = 0; - } + var ctx = this.ctx; - pub fn loop(this: *ToyHTTPServer) void { - this.drain(); + { + this.lock.lock(); + defer { + this.lock.unlock(); + } - while (true) { - while (this.active.readItem()) |incoming| { - // defer incoming.freeData(bun.default_allocator); - sendStaticMessageConcurrent(this, incoming.fd, hello_world); + if (this.first_is_active) { + this.first_is_active = false; + this.second_list.len = 0; + + this.incoming_list = this.outgoing_list.value; + this.outgoing_list.value = &this.second_list; + } else { + this.first_is_active = true; + this.first_list.len = 0; + + this.incoming_list = this.outgoing_list.value; + this.outgoing_list.value = &this.first_list; } + } - this.io.wait(this, drain); + const slice = this.incoming_list.slice(); + for (slice) |incoming| { + const sent = AsyncIO.darwin.@"sendto"(incoming.fd, hello_world, hello_world.len, 0, null, 0); + if (sent < hello_world.len) { + var socket = uWS.SocketTCP.attach(incoming.fd, ctx) orelse continue; + _ = socket.write(hello_world, true); + } } + this.incoming_list.len = 0; } - pub fn startServer(task: *NetworkThread.Task) void { + // pub fn dispatch(this: *ToyHTTPServer, socket: *WritableSocket, _: IncomingRequest) void { + // this.server.takeAsync(socket.socket.detach()); + // } + + pub const WritableSocket = struct { + socket: uWS.SocketTCP, + incoming_request: IncomingRequest = undefined, + is_writable: bool = false, + has_received: bool = false, + has_incoming_request: bool = false, + + pub fn onOpen(_: *WritableSocket, _: uWS.SocketTCP) void { + // this.socket = socket; + // socket.timeout(30); + // this.is_writable = false; + // log("Client connected", .{}); + } + + pub fn dispatch(this: *WritableSocket) void { + this.has_received = false; + this.is_writable = false; + // this.server().dispatch(this, this.incoming_request); + return; + } + + pub fn onClose(this: *WritableSocket, socket: uWS.SocketTCP, _: c_int, _: ?*anyopaque) void { + _ = this; + _ = socket; + + log("Client disconnected", .{}); + } + + pub fn onWritable(this: *WritableSocket, socket: uWS.SocketTCP) void { + _ = this; + _ = socket; + + this.is_writable = true; + } + + pub fn onData(this: *WritableSocket, socket: uWS.SocketTCP, data: []const u8) void { + _ = this; + _ = socket; + _ = data; + // socket.timeout(30); + + // var headers: [512]picohttp.Header = undefined; + // const request = HTTPRequest.parse(data, &headers) catch |err| { + // switch (err) { + // error.BadRequest => { + // log("onRecv bad request", .{}); + // this.socket.close(0, null); + // return; + // }, + // error.ShortRead => { + // return; + // }, + // } + // }; + + // const fd = @intCast(fd_t, @ptrToInt(socket.handle().?)); + // if (this.has_incoming_request) { + // this.incoming_request.freeData(bun.default_allocator); + // } + // this.incoming_request = IncomingRequest.create(bun.default_allocator, data, fd, request) catch { + // log("Dropping request due to OOM!", .{}); + // this.socket.close(0, null); + // return; + // }; + // this.has_received = true; + // this.has_incoming_request = true; + // this.dispatch(); + } + + pub fn onTimeout(this: *WritableSocket, socket: uWS.SocketTCP) void { + _ = this; + _ = socket; + socket.close(0, null); + } + + pub fn onConnectError(this: *WritableSocket, socket: uWS.SocketTCP, code: c_int) void { + _ = this; + _ = socket; + _ = code; + } + + pub fn onEnd(this: *WritableSocket, socket: uWS.SocketTCP) void { + _ = this; + _ = socket; + + socket.shutdown(); + socket.close(0, null); + } + + pub inline fn server(this: WritableSocket) *ToyHTTPServer { + return @ptrCast(**ToyHTTPServer, @alignCast(@alignOf(*ToyHTTPServer), uWS.us_socket_context_ext(0, this.socket.context())).?).*; + } + }; + + fn scheduleWakeup(this: *ToyHTTPServer) void { + if (this.has_scheduled.load(.Monotonic) == 0) return; + + this.waker.wake() catch unreachable; + } + pub fn startServer(toy: *ToyHTTPServer) void { + Output.Source.configureNamedThread("ToyHTTPServer"); var toy_config = ServerConfig{ .port = std.fmt.parseInt(u16, std.os.getenv("PORT") orelse "3001", 10) catch 3001, }; defer Output.prettyln("Server started on port {d}", .{toy_config.port}); defer Output.flush(); - var toy = @fieldParentPtr(ToyHTTPServer, "task", task); toy.server = Server.start(toy_config, RequestHandler.New(ToyHTTPServer, onRequest).init(toy)) catch unreachable; + _ = toy.server.loop.addPostHandler(*ToyHTTPServer, toy, scheduleWakeup); + toy.server.loop.run(); } pub fn main() anyerror!void { var http = try bun.default_allocator.create(ToyHTTPServer); - var stdout_ = std.io.getStdOut(); var stderr_ = std.io.getStdErr(); var output_source = Output.Source.init(stdout_, stderr_); + _ = try adjustUlimit(); Output.Source.set(&output_source); defer Output.flush(); - try NetworkThread.init(); http.* = .{ - .pending = Fifo.init(bun.default_allocator), - .active = Fifo.init(bun.default_allocator), - .io = try AsyncIO.init(1024, 0, try AsyncIO.Waker.init(bun.default_allocator)), + .active = Fifo.init(), .server = undefined, + .loop = uWS.Loop.get().?, + .waker = AsyncIO.Waker.init(bun.default_allocator) catch unreachable, }; - NetworkThread.global.schedule(NetworkThread.Batch.from(&http.task)); - http.loop(); + http.incoming_list = &http.first_list; + http.outgoing_list.value = &http.second_list; + + http.ctx = uWS.us_create_socket_context(0, http.loop, 8, .{}).?; + uWS.SocketTCP.configure( + http.ctx, + WritableSocket, + WritableSocket.onOpen, + WritableSocket.onClose, + WritableSocket.onData, + WritableSocket.onWritable, + WritableSocket.onTimeout, + WritableSocket.onConnectError, + WritableSocket.onEnd, + ); + + @ptrCast(**anyopaque, @alignCast(@alignOf(*anyopaque), uWS.us_socket_context_ext(0, http.ctx).?)).* = @ptrCast(*anyopaque, &http); + _ = http.loop.addPostHandler(*ToyHTTPServer, http, drain); + var thread = std.Thread.spawn(.{}, startServer, .{http}) catch unreachable; + http.drain(); + thread.detach(); + while (true) { + _ = http.waker.wait() catch 0; + http.drain(); + } } }; diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig index af5382053..ccefa4a1b 100644 --- a/src/io/io_linux.zig +++ b/src/io/io_linux.zig @@ -995,7 +995,7 @@ pub const Waker = struct { pub fn wait(this: Waker) !u64 { var bytes: usize = 0; - _ = std.os.read(this.fd, @ptrCast(*[8]u8, &bytes)) catch 0; + _ = try std.os.read(this.fd, @ptrCast(*[8]u8, &bytes)); return @intCast(u64, bytes); } |