diff options
author | 2022-03-02 21:17:38 -0800 | |
---|---|---|
committer | 2022-03-02 21:17:38 -0800 | |
commit | 4c5eb4b4db851e478b9eb45af0be9b650ee1e3ff (patch) | |
tree | 2ef421c42843345beb9979ab959bdf7764dd5b85 | |
parent | 117ca5355d4bda2d88bcd55ed389057ed5358839 (diff) | |
download | bun-4c5eb4b4db851e478b9eb45af0be9b650ee1e3ff.tar.gz bun-4c5eb4b4db851e478b9eb45af0be9b650ee1e3ff.tar.zst bun-4c5eb4b4db851e478b9eb45af0be9b650ee1e3ff.zip |
[bun dev] Improve HMR performance by pooling websocket threads
Previously, bun would create a new thread for each websocket connection. Now, it re-uses them
Eventually, this should use evented i/o but other changes need to be made to support that
-rw-r--r-- | src/http.zig | 96 |
1 files changed, 59 insertions, 37 deletions
diff --git a/src/http.zig b/src/http.zig index f848fbc1e..168982077 100644 --- a/src/http.zig +++ b/src/http.zig @@ -34,9 +34,12 @@ const DotEnv = @import("./env_loader.zig"); const mimalloc = @import("./allocators/mimalloc.zig"); const MacroMap = @import("./resolver/package_json.zig").MacroMap; const Analytics = @import("./analytics/analytics_thread.zig"); -const Arena = @import("./mimalloc_arena.zig").Arena; +const MiArena = @import("./mimalloc_arena.zig").Arena; +const Arena = MiArena; const ArenaType = Arena; const JSON = @import("./json_parser.zig"); +const DateTime = @import("datetime"); +const ThreadPool = @import("thread_pool"); pub fn constStrToU8(s: string) []u8 { return @intToPtr([*]u8, @ptrToInt(s.ptr))[0..s.len]; } @@ -99,6 +102,7 @@ pub const RequestContext = struct { timer: std.time.Timer, matched_route: ?Router.Match = null, origin: ZigURL, + datetime_buf: [512]u8 = undefined, full_url: [:0]const u8 = "", res_headers_count: usize = 0, @@ -575,18 +579,16 @@ pub const RequestContext = struct { return std.fmt.comptimePrint("HTTP/1.1 {d} {s}\r\n", .{ code, status_text }); } - var status_line_error_buf: [1024]u8 = undefined; - pub fn printStatusLineError(err: anyerror) []const u8 { - return std.fmt.bufPrint(&status_line_error_buf, "HTTP/1.1 500 {s}\r\n", .{@errorName(err)}) catch unreachable; + pub fn printStatusLineError(err: anyerror, buf: []u8) []const u8 { + return std.fmt.bufPrint(buf, "HTTP/1.1 500 {s}\r\n", .{@errorName(err)}) catch unreachable; } - threadlocal var content_length_header_buf: [64]u8 = undefined; - pub fn prepareToSendBody( ctx: *RequestContext, length: usize, comptime chunked: bool, ) !void { + var content_length_header_buf: [64]u8 = undefined; defer { if (Environment.allow_assert) { std.debug.assert(!ctx.has_written_last_header); @@ -657,7 +659,8 @@ pub const RequestContext = struct { } pub fn writeStatusError(ctx: *RequestContext, err: anyerror) !void { - _ = try ctx.writeSocket(printStatusLineError(err), SOCKET_FLAGS); + var status_line_error_buf: [1024]u8 = undefined; + _ = try ctx.writeSocket(printStatusLineError(err, &status_line_error_buf), SOCKET_FLAGS); ctx.status = @as(HTTPStatusCode, 500); } @@ -852,7 +855,7 @@ pub const RequestContext = struct { pub fn sendHTMLFile(ctx: *RequestContext, file: std.fs.File) !void { ctx.appendHeader("Content-Type", MimeType.html.value); - ctx.appendHeader("Cache-Control", "no-cache"); + ctx.appendHeader("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"); defer ctx.done(); @@ -866,15 +869,22 @@ pub const RequestContext = struct { try ctx.writeStatus(200); try ctx.prepareToSendBody(content_length, false); - _ = try std.os.sendfile( - ctx.conn.client.socket.fd, - ctx.bundler.options.routes.single_page_app_fd, - 0, - content_length, - &[_]std.os.iovec_const{}, - &[_]std.os.iovec_const{}, - 0, - ); + var remain = content_length; + while (remain > 0) { + const wrote = try std.os.sendfile( + ctx.conn.client.socket.fd, + ctx.bundler.options.routes.single_page_app_fd, + content_length - remain, + remain, + &[_]std.os.iovec_const{}, + &[_]std.os.iovec_const{}, + 0, + ); + if (wrote == 0) { + break; + } + remain -|= wrote; + } } pub const WatchBuilder = struct { @@ -1252,7 +1262,7 @@ pub const RequestContext = struct { } pub fn handleFetchEventError(this: *HandlerThread, err: anyerror, js_value: JavaScript.JSValue, ctx: *RequestContext) !void { - var arena = ArenaType.init() catch unreachable; + var arena = MiArena.init() catch unreachable; var allocator = arena.allocator(); defer arena.deinit(); @@ -1470,7 +1480,7 @@ pub const RequestContext = struct { vm.global.vm().holdAPILock(handler, JavaScript.OpaqueWrap(HandlerThread, startJavaScript)); } - var __arena: Arena = undefined; + var __arena: MiArena = undefined; pub fn runLoop(vm: *JavaScript.VirtualMachine, thread: *HandlerThread) !void { var module_map = JavaScript.ZigGlobalObject.getModuleRegistryMap(vm.global); @@ -1483,7 +1493,7 @@ pub const RequestContext = struct { } while (true) { - __arena = try Arena.init(); + __arena = MiArena.init() catch unreachable; JavaScript.VirtualMachine.vm.arena = &__arena; JavaScript.VirtualMachine.vm.has_loaded = true; JavaScript.VirtualMachine.vm.tick(); @@ -1599,6 +1609,7 @@ pub const RequestContext = struct { builder: WatchBuilder, message_buffer: MutableString, bundler: Bundler, + task: ThreadPool.Task, pub var open_websockets: std.ArrayList(*WebsocketHandler) = undefined; var open_websockets_lock = sync.RwLock.init(); pub fn addWebsocket(ctx: *RequestContext, server: *Server) !*WebsocketHandler { @@ -1610,7 +1621,7 @@ pub const RequestContext = struct { clone.conn = ctx.conn.*; try ctx.bundler.clone(server.allocator, &clone.bundler); ctx.bundler = &clone.bundler; - + clone.task = .{ .callback = onTask }; clone.message_buffer = try MutableString.init(server.allocator, 0); clone.ctx.conn = &clone.conn; clone.ctx.log = logger.Log.init(server.allocator); @@ -1694,18 +1705,23 @@ pub const RequestContext = struct { } } + pub fn onSpawnThread(_: ?*anyopaque) ?*anyopaque { + Global.setThreadName("HMR"); + Output.Source.configureThread(); + js_ast.Stmt.Data.Store.create(default_allocator); + js_ast.Expr.Data.Store.create(default_allocator); + return null; + } + + pub fn onTask(self: *ThreadPool.Task) void { + handle(@fieldParentPtr(WebsocketHandler, "task", self)); + } pub fn handle(self: *WebsocketHandler) void { - var stdout = std.io.getStdOut(); - // var stdout = std.io.bufferedWriter(stdout_file.writer()); - var stderr = std.io.getStdErr(); - // var stderr = std.io.bufferedWriter(stderr_file.writer()); - var output_source = Output.Source.init(stdout, stderr); - // defer stdout.flush() catch {}; - // defer stderr.flush() catch {}; - Output.Source.set(&output_source); - Output.enable_ansi_colors = stderr.isTty(); - js_ast.Stmt.Data.Store.create(self.ctx.allocator); - js_ast.Expr.Data.Store.create(self.ctx.allocator); + defer { + js_ast.Stmt.Data.Store.reset(); + js_ast.Expr.Data.Store.reset(); + } + self.builder.printer = js_printer.BufferPrinter.init( js_printer.BufferWriter.init(self.ctx.allocator) catch unreachable, ); @@ -1874,7 +1890,7 @@ pub const RequestContext = struct { break :brk full_build.id; }; - var arena = ArenaType.init() catch unreachable; + var arena = MiArena.init() catch unreachable; defer arena.deinit(); var head = Websocket.WebsocketHeader{ @@ -2059,7 +2075,7 @@ pub const RequestContext = struct { pub fn handleWebsocket(ctx: *RequestContext, server: *Server) anyerror!void { ctx.controlled = true; var handler = try WebsocketHandler.addWebsocket(ctx, server); - _ = try std.Thread.spawn(.{}, WebsocketHandler.handle, .{handler}); + server.websocket_threadpool.schedule(ThreadPool.Batch.from(&handler.task)); } threadlocal var client_entry_point: bundler.ClientEntryPoint = undefined; @@ -3012,6 +3028,10 @@ pub const Server = struct { transform_options: Api.TransformOptions, javascript_enabled: bool = false, fallback_only: bool = false, + websocket_threadpool: ThreadPool = ThreadPool.init(.{ + .stack_size = 128 * 1024, // `pthread_attr_setstacksize` does not like 128 KB stack size + .max_threads = std.math.maxInt(u32), + }), pub var editor: ?Editor = null; pub var editor_name: string = ""; @@ -3513,7 +3533,7 @@ pub const Server = struct { }; var request_arena = server.allocator.create(Arena) catch unreachable; - request_arena.* = Arena.init() catch unreachable; + request_arena.* = ArenaType.init() catch unreachable; req_ctx_ = RequestContext.init( req, @@ -3533,6 +3553,7 @@ pub const Server = struct { const is_navigation_request = req_ctx_.isBrowserNavigation(); defer if (is_navigation_request == .yes) Analytics.enqueue(Analytics.EventName.http_build); req_ctx.parseOrigin(); + // req_ctx.appendHeader("Date", value: string) outer: { const now = DateTime.Datetime.now(); req_ctx.appendHeader( @@ -3628,10 +3649,9 @@ pub const Server = struct { if (req_ctx.header("Connection")) |connection| { req_ctx.keep_alive = strings.eqlInsensitive(connection, "keep-alive"); } - - conn.client.setKeepAlive(req_ctx.keep_alive) catch {}; } else { req_ctx.keep_alive = false; + req_ctx.appendHeader("Connection", "close"); } var finished = req_ctx.handleReservedRoutes(server) catch |err| { @@ -3850,6 +3870,8 @@ pub const Server = struct { server.bundler.options.routes.static_dir, ); + server.websocket_threadpool.on_thread_spawn = RequestContext.WebsocketHandler.onSpawnThread; + if (server.bundler.router != null and server.bundler.options.routes.static_dir_enabled) { if (!public_folder_is_top_level) { try server.run( |