aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-03-02 21:17:38 -0800
committerGravatar Jarred Sumner <jarred@jarredsumner.com> 2022-03-02 21:17:38 -0800
commit4c5eb4b4db851e478b9eb45af0be9b650ee1e3ff (patch)
tree2ef421c42843345beb9979ab959bdf7764dd5b85
parent117ca5355d4bda2d88bcd55ed389057ed5358839 (diff)
downloadbun-4c5eb4b4db851e478b9eb45af0be9b650ee1e3ff.tar.gz
bun-4c5eb4b4db851e478b9eb45af0be9b650ee1e3ff.tar.zst
bun-4c5eb4b4db851e478b9eb45af0be9b650ee1e3ff.zip
[bun dev] Improve HMR performance by pooling websocket threads
Previously, bun would create a new thread for each websocket connection. Now, it re-uses them Eventually, this should use evented i/o but other changes need to be made to support that
-rw-r--r--src/http.zig96
1 files changed, 59 insertions, 37 deletions
diff --git a/src/http.zig b/src/http.zig
index f848fbc1e..168982077 100644
--- a/src/http.zig
+++ b/src/http.zig
@@ -34,9 +34,12 @@ const DotEnv = @import("./env_loader.zig");
const mimalloc = @import("./allocators/mimalloc.zig");
const MacroMap = @import("./resolver/package_json.zig").MacroMap;
const Analytics = @import("./analytics/analytics_thread.zig");
-const Arena = @import("./mimalloc_arena.zig").Arena;
+const MiArena = @import("./mimalloc_arena.zig").Arena;
+const Arena = MiArena;
const ArenaType = Arena;
const JSON = @import("./json_parser.zig");
+const DateTime = @import("datetime");
+const ThreadPool = @import("thread_pool");
pub fn constStrToU8(s: string) []u8 {
return @intToPtr([*]u8, @ptrToInt(s.ptr))[0..s.len];
}
@@ -99,6 +102,7 @@ pub const RequestContext = struct {
timer: std.time.Timer,
matched_route: ?Router.Match = null,
origin: ZigURL,
+ datetime_buf: [512]u8 = undefined,
full_url: [:0]const u8 = "",
res_headers_count: usize = 0,
@@ -575,18 +579,16 @@ pub const RequestContext = struct {
return std.fmt.comptimePrint("HTTP/1.1 {d} {s}\r\n", .{ code, status_text });
}
- var status_line_error_buf: [1024]u8 = undefined;
- pub fn printStatusLineError(err: anyerror) []const u8 {
- return std.fmt.bufPrint(&status_line_error_buf, "HTTP/1.1 500 {s}\r\n", .{@errorName(err)}) catch unreachable;
+ pub fn printStatusLineError(err: anyerror, buf: []u8) []const u8 {
+ return std.fmt.bufPrint(buf, "HTTP/1.1 500 {s}\r\n", .{@errorName(err)}) catch unreachable;
}
- threadlocal var content_length_header_buf: [64]u8 = undefined;
-
pub fn prepareToSendBody(
ctx: *RequestContext,
length: usize,
comptime chunked: bool,
) !void {
+ var content_length_header_buf: [64]u8 = undefined;
defer {
if (Environment.allow_assert) {
std.debug.assert(!ctx.has_written_last_header);
@@ -657,7 +659,8 @@ pub const RequestContext = struct {
}
pub fn writeStatusError(ctx: *RequestContext, err: anyerror) !void {
- _ = try ctx.writeSocket(printStatusLineError(err), SOCKET_FLAGS);
+ var status_line_error_buf: [1024]u8 = undefined;
+ _ = try ctx.writeSocket(printStatusLineError(err, &status_line_error_buf), SOCKET_FLAGS);
ctx.status = @as(HTTPStatusCode, 500);
}
@@ -852,7 +855,7 @@ pub const RequestContext = struct {
pub fn sendHTMLFile(ctx: *RequestContext, file: std.fs.File) !void {
ctx.appendHeader("Content-Type", MimeType.html.value);
- ctx.appendHeader("Cache-Control", "no-cache");
+ ctx.appendHeader("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0");
defer ctx.done();
@@ -866,15 +869,22 @@ pub const RequestContext = struct {
try ctx.writeStatus(200);
try ctx.prepareToSendBody(content_length, false);
- _ = try std.os.sendfile(
- ctx.conn.client.socket.fd,
- ctx.bundler.options.routes.single_page_app_fd,
- 0,
- content_length,
- &[_]std.os.iovec_const{},
- &[_]std.os.iovec_const{},
- 0,
- );
+ var remain = content_length;
+ while (remain > 0) {
+ const wrote = try std.os.sendfile(
+ ctx.conn.client.socket.fd,
+ ctx.bundler.options.routes.single_page_app_fd,
+ content_length - remain,
+ remain,
+ &[_]std.os.iovec_const{},
+ &[_]std.os.iovec_const{},
+ 0,
+ );
+ if (wrote == 0) {
+ break;
+ }
+ remain -|= wrote;
+ }
}
pub const WatchBuilder = struct {
@@ -1252,7 +1262,7 @@ pub const RequestContext = struct {
}
pub fn handleFetchEventError(this: *HandlerThread, err: anyerror, js_value: JavaScript.JSValue, ctx: *RequestContext) !void {
- var arena = ArenaType.init() catch unreachable;
+ var arena = MiArena.init() catch unreachable;
var allocator = arena.allocator();
defer arena.deinit();
@@ -1470,7 +1480,7 @@ pub const RequestContext = struct {
vm.global.vm().holdAPILock(handler, JavaScript.OpaqueWrap(HandlerThread, startJavaScript));
}
- var __arena: Arena = undefined;
+ var __arena: MiArena = undefined;
pub fn runLoop(vm: *JavaScript.VirtualMachine, thread: *HandlerThread) !void {
var module_map = JavaScript.ZigGlobalObject.getModuleRegistryMap(vm.global);
@@ -1483,7 +1493,7 @@ pub const RequestContext = struct {
}
while (true) {
- __arena = try Arena.init();
+ __arena = MiArena.init() catch unreachable;
JavaScript.VirtualMachine.vm.arena = &__arena;
JavaScript.VirtualMachine.vm.has_loaded = true;
JavaScript.VirtualMachine.vm.tick();
@@ -1599,6 +1609,7 @@ pub const RequestContext = struct {
builder: WatchBuilder,
message_buffer: MutableString,
bundler: Bundler,
+ task: ThreadPool.Task,
pub var open_websockets: std.ArrayList(*WebsocketHandler) = undefined;
var open_websockets_lock = sync.RwLock.init();
pub fn addWebsocket(ctx: *RequestContext, server: *Server) !*WebsocketHandler {
@@ -1610,7 +1621,7 @@ pub const RequestContext = struct {
clone.conn = ctx.conn.*;
try ctx.bundler.clone(server.allocator, &clone.bundler);
ctx.bundler = &clone.bundler;
-
+ clone.task = .{ .callback = onTask };
clone.message_buffer = try MutableString.init(server.allocator, 0);
clone.ctx.conn = &clone.conn;
clone.ctx.log = logger.Log.init(server.allocator);
@@ -1694,18 +1705,23 @@ pub const RequestContext = struct {
}
}
+ pub fn onSpawnThread(_: ?*anyopaque) ?*anyopaque {
+ Global.setThreadName("HMR");
+ Output.Source.configureThread();
+ js_ast.Stmt.Data.Store.create(default_allocator);
+ js_ast.Expr.Data.Store.create(default_allocator);
+ return null;
+ }
+
+ pub fn onTask(self: *ThreadPool.Task) void {
+ handle(@fieldParentPtr(WebsocketHandler, "task", self));
+ }
pub fn handle(self: *WebsocketHandler) void {
- var stdout = std.io.getStdOut();
- // var stdout = std.io.bufferedWriter(stdout_file.writer());
- var stderr = std.io.getStdErr();
- // var stderr = std.io.bufferedWriter(stderr_file.writer());
- var output_source = Output.Source.init(stdout, stderr);
- // defer stdout.flush() catch {};
- // defer stderr.flush() catch {};
- Output.Source.set(&output_source);
- Output.enable_ansi_colors = stderr.isTty();
- js_ast.Stmt.Data.Store.create(self.ctx.allocator);
- js_ast.Expr.Data.Store.create(self.ctx.allocator);
+ defer {
+ js_ast.Stmt.Data.Store.reset();
+ js_ast.Expr.Data.Store.reset();
+ }
+
self.builder.printer = js_printer.BufferPrinter.init(
js_printer.BufferWriter.init(self.ctx.allocator) catch unreachable,
);
@@ -1874,7 +1890,7 @@ pub const RequestContext = struct {
break :brk full_build.id;
};
- var arena = ArenaType.init() catch unreachable;
+ var arena = MiArena.init() catch unreachable;
defer arena.deinit();
var head = Websocket.WebsocketHeader{
@@ -2059,7 +2075,7 @@ pub const RequestContext = struct {
pub fn handleWebsocket(ctx: *RequestContext, server: *Server) anyerror!void {
ctx.controlled = true;
var handler = try WebsocketHandler.addWebsocket(ctx, server);
- _ = try std.Thread.spawn(.{}, WebsocketHandler.handle, .{handler});
+ server.websocket_threadpool.schedule(ThreadPool.Batch.from(&handler.task));
}
threadlocal var client_entry_point: bundler.ClientEntryPoint = undefined;
@@ -3012,6 +3028,10 @@ pub const Server = struct {
transform_options: Api.TransformOptions,
javascript_enabled: bool = false,
fallback_only: bool = false,
+ websocket_threadpool: ThreadPool = ThreadPool.init(.{
+ .stack_size = 128 * 1024, // `pthread_attr_setstacksize` does not like 128 KB stack size
+ .max_threads = std.math.maxInt(u32),
+ }),
pub var editor: ?Editor = null;
pub var editor_name: string = "";
@@ -3513,7 +3533,7 @@ pub const Server = struct {
};
var request_arena = server.allocator.create(Arena) catch unreachable;
- request_arena.* = Arena.init() catch unreachable;
+ request_arena.* = ArenaType.init() catch unreachable;
req_ctx_ = RequestContext.init(
req,
@@ -3533,6 +3553,7 @@ pub const Server = struct {
const is_navigation_request = req_ctx_.isBrowserNavigation();
defer if (is_navigation_request == .yes) Analytics.enqueue(Analytics.EventName.http_build);
req_ctx.parseOrigin();
+ // req_ctx.appendHeader("Date", value: string)
outer: {
const now = DateTime.Datetime.now();
req_ctx.appendHeader(
@@ -3628,10 +3649,9 @@ pub const Server = struct {
if (req_ctx.header("Connection")) |connection| {
req_ctx.keep_alive = strings.eqlInsensitive(connection, "keep-alive");
}
-
- conn.client.setKeepAlive(req_ctx.keep_alive) catch {};
} else {
req_ctx.keep_alive = false;
+ req_ctx.appendHeader("Connection", "close");
}
var finished = req_ctx.handleReservedRoutes(server) catch |err| {
@@ -3850,6 +3870,8 @@ pub const Server = struct {
server.bundler.options.routes.static_dir,
);
+ server.websocket_threadpool.on_thread_spawn = RequestContext.WebsocketHandler.onSpawnThread;
+
if (server.bundler.router != null and server.bundler.options.routes.static_dir_enabled) {
if (!public_folder_is_top_level) {
try server.run(