aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-08-31 22:38:05 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-08-31 22:38:05 -0700
commitca9c87f9d76159a286469a230c84e7c5a2c3ef6b (patch)
tree888b5b9c081630318e870fcf686368b19458a98a
parent4528b9938f54ceeb6bf5d90a9266eb2070292623 (diff)
parenta626a07ee86d7b5dd8441002b2c0ba0bf9b68220 (diff)
downloadbun-ca9c87f9d76159a286469a230c84e7c5a2c3ef6b.tar.gz
bun-ca9c87f9d76159a286469a230c84e7c5a2c3ef6b.tar.zst
bun-ca9c87f9d76159a286469a230c84e7c5a2c3ef6b.zip
Merge branch 'jarred/usockets-direction' into jarred/new-http
-rw-r--r--.vscode/launch.json13
-rw-r--r--Makefile8
-rw-r--r--src/deps/uws.zig72
-rw-r--r--src/http_server.zig907
-rw-r--r--src/io/io_linux.zig2
5 files changed, 407 insertions, 595 deletions
diff --git a/.vscode/launch.json b/.vscode/launch.json
index 932a71dc4..36f98a9bb 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -99,8 +99,17 @@
{
"type": "lldb",
"request": "launch",
- "name": "fetch debug",
- "program": "${workspaceFolder}/misctools/fetch",
+ "name": "toyhttpserver debug",
+ "program": "${workspaceFolder}/misctools/toyhttpserver",
+ "args": ["https://example.com", "--verbose", "3001"],
+ "cwd": "${workspaceFolder}",
+ "console": "internalConsole"
+ },
+ {
+ "type": "lldb",
+ "request": "launch",
+ "name": "toyhttpserver lite debug",
+ "program": "${workspaceFolder}/misctools/toyhttpserver-lite",
"args": ["https://example.com", "--verbose"],
"cwd": "${workspaceFolder}",
"console": "internalConsole"
diff --git a/Makefile b/Makefile
index 9eb4c8500..1559fc188 100644
--- a/Makefile
+++ b/Makefile
@@ -797,23 +797,23 @@ httpbench-release: $(IO_FILES)
.PHONY: toyhttpserver-debug
toyhttpserver-debug:
$(ZIG) build toyhttpserver-obj
- $(CXX) $(DEBUG_PACKAGE_DIR)/toyhttpserver.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver $(DEBUG_IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES)
+ $(CXX) $(DEBUG_PACKAGE_DIR)/toyhttpserver.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver $(DEBUG_IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES)
.PHONY: toyhttpserver
toyhttpserver:
$(ZIG) build -Drelease-fast toyhttpserver-obj
- $(CXX) $(PACKAGE_DIR)/toyhttpserver.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES)
+ $(CXX) $(PACKAGE_DIR)/toyhttpserver.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES)
rm -rf $(PACKAGE_DIR)/toyhttpserver.o
.PHONY: toyhttpserver-lite-debug
toyhttpserver-lite-debug:
$(ZIG) build toyhttpserver-lite-obj
- $(CXX) $(DEBUG_PACKAGE_DIR)/toyhttpserver-lite.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver-lite $(DEBUG_IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES)
+ $(CXX) $(DEBUG_PACKAGE_DIR)/toyhttpserver-lite.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver-lite $(DEBUG_IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES)
.PHONY: toyhttpserver-lite
toyhttpserver-lite:
$(ZIG) build -Drelease-fast toyhttpserver-lite-obj
- $(CXX) $(PACKAGE_DIR)/toyhttpserver-lite.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver-lite $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES)
+ $(CXX) $(PACKAGE_DIR)/toyhttpserver-lite.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver-lite $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES)
rm -rf $(PACKAGE_DIR)/toyhttpserver-lite.o
.PHONY: check-glibc-version-dependency
diff --git a/src/deps/uws.zig b/src/deps/uws.zig
index c545de2df..ea033f546 100644
--- a/src/deps/uws.zig
+++ b/src/deps/uws.zig
@@ -17,6 +17,10 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
socket: *Socket,
const ThisSocket = @This();
+ pub fn handle(this: ThisSocket) ?*anyopaque {
+ return us_socket_get_native_handle(comptime ssl_int, this.socket);
+ }
+
pub fn isEstablished(this: ThisSocket) bool {
return us_socket_is_established(comptime ssl_int, this.socket) > 0;
}
@@ -36,7 +40,7 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
return us_socket_context(
comptime ssl_int,
this.socket,
- );
+ ).?;
}
pub fn flush(this: ThisSocket) void {
return us_socket_flush(
@@ -101,6 +105,56 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
);
}
+ pub fn attach(fd: c_int, ctx: *us_socket_context_t) ?ThisSocket {
+ if (us_socket_attach(comptime ssl_int, fd, ctx)) |socket| {
+ return ThisSocket{ .socket = socket };
+ } else {
+ return null;
+ }
+ }
+
+ pub fn detach(this: ThisSocket) c_int {
+ const handle_ = this.handle().?;
+ var socket_ = this.socket;
+ if (us_socket_detach(comptime ssl_int, socket_) == null)
+ return -1;
+
+ return @intCast(c_int, @ptrToInt(handle_));
+ }
+
+ pub fn listen(
+ host: []const u8,
+ port: c_int,
+ socket_ctx: *us_socket_context_t,
+ comptime Context: type,
+ ctx: Context,
+ comptime socket_field_name: []const u8,
+ ) ?*Context {
+ // var stack_fallback = std.heap.stackFallback(1024, bun.default_allocator);
+ // var allocator = stack_fallback.get();
+ // var host_ = allocator.dupeZ(u8, host) catch return null;
+ // defer allocator.free(host_);
+ _ = host;
+
+ var socket = us_socket_context_listen(
+ comptime ssl_int,
+ socket_ctx,
+ null,
+ port,
+ 0,
+ @sizeOf(Context),
+ ) orelse return null;
+ const socket_ = ThisSocket{ .socket = @ptrCast(*Socket, socket) };
+ var holder = socket_.ext(Context) orelse {
+ if (comptime bun.Environment.allow_assert) unreachable;
+ _ = us_socket_close_connecting(comptime ssl_int, socket_.socket);
+ return null;
+ };
+ holder.* = ctx;
+ @field(holder.*, socket_field_name) = socket;
+ return holder;
+ }
+
pub fn connect(
host: []const u8,
port: c_int,
@@ -316,7 +370,7 @@ extern fn us_socket_context_add_server_name(ssl: c_int, context: ?*us_socket_con
extern fn us_socket_context_remove_server_name(ssl: c_int, context: ?*us_socket_context_t, hostname_pattern: [*c]const u8) void;
extern fn us_socket_context_on_server_name(ssl: c_int, context: ?*us_socket_context_t, cb: ?fn (?*us_socket_context_t, [*c]const u8) callconv(.C) void) void;
extern fn us_socket_context_get_native_handle(ssl: c_int, context: ?*us_socket_context_t) ?*anyopaque;
-extern fn us_create_socket_context(ssl: c_int, loop: ?*Loop, ext_size: c_int, options: us_socket_context_options_t) ?*us_socket_context_t;
+pub extern fn us_create_socket_context(ssl: c_int, loop: ?*Loop, ext_size: c_int, options: us_socket_context_options_t) ?*us_socket_context_t;
extern fn us_socket_context_free(ssl: c_int, context: ?*us_socket_context_t) void;
extern fn us_socket_context_on_open(ssl: c_int, context: ?*us_socket_context_t, on_open: fn (*Socket, c_int, [*c]u8, c_int) callconv(.C) ?*Socket) void;
extern fn us_socket_context_on_close(ssl: c_int, context: ?*us_socket_context_t, on_close: fn (*Socket, c_int, ?*anyopaque) callconv(.C) ?*Socket) void;
@@ -325,7 +379,7 @@ extern fn us_socket_context_on_writable(ssl: c_int, context: ?*us_socket_context
extern fn us_socket_context_on_timeout(ssl: c_int, context: ?*us_socket_context_t, on_timeout: fn (*Socket) callconv(.C) ?*Socket) void;
extern fn us_socket_context_on_connect_error(ssl: c_int, context: ?*us_socket_context_t, on_connect_error: fn (*Socket, c_int) callconv(.C) ?*Socket) void;
extern fn us_socket_context_on_end(ssl: c_int, context: ?*us_socket_context_t, on_end: fn (*Socket) callconv(.C) ?*Socket) void;
-extern fn us_socket_context_ext(ssl: c_int, context: ?*us_socket_context_t) ?*anyopaque;
+pub extern fn us_socket_context_ext(ssl: c_int, context: ?*us_socket_context_t) ?*anyopaque;
extern fn us_socket_context_listen(ssl: c_int, context: ?*us_socket_context_t, host: [*c]const u8, port: c_int, options: c_int, socket_ext_size: c_int) ?*listen_socket_t;
@@ -391,8 +445,8 @@ pub const Poll = opaque {
pub const write_flag = if (Environment.isLinux) std.os.linux.EPOLL.OUT else 2;
};
- pub fn deinit(self: *Poll) void {
- us_poll_free(self);
+ pub fn deinit(self: *Poll, loop: *Loop) void {
+ us_poll_free(self, loop);
}
// (void* userData, int fd, int events, int error, struct us_poll_t *poll)
@@ -503,7 +557,11 @@ pub const Request = opaque {
extern fn uws_req_get_parameter(res: *Request, index: c_ushort, dest: *[*]const u8) usize;
};
-const listen_socket_t = opaque {};
+pub const listen_socket_t = opaque {
+ pub fn close(this: *listen_socket_t, comptime ssl: bool) void {
+ us_listen_socket_close(comptime @as(c_int, @boolToInt(ssl)), this);
+ }
+};
extern fn us_listen_socket_close(ssl: c_int, ls: *listen_socket_t) void;
pub fn NewApp(comptime ssl: bool) type {
@@ -1148,3 +1206,5 @@ pub const uws_app_listen_config_t = extern struct {
};
extern fn us_socket_mark_needs_more_not_ssl(socket: ?*uws_res) void;
+extern fn us_socket_detach(ssl: c_int, socket: *Socket) ?*Socket;
+extern fn us_socket_attach(ssl: c_int, client_fd: c_int, ctx: *us_socket_context_t) ?*Socket;
diff --git a/src/http_server.zig b/src/http_server.zig
index 302712df3..31cdb0377 100644
--- a/src/http_server.zig
+++ b/src/http_server.zig
@@ -19,6 +19,8 @@ const StringPointer = @import("./api/schema.zig").Api.StringPointer;
const StringBuilder = @import("./string_builder.zig");
const Lock = @import("./lock.zig").Lock;
const log = Output.scoped(.HTTPServer, false);
+const uWS = @import("uws");
+const adjustUlimit = @import("./fs.zig").FileSystem.RealFS.adjustUlimit;
const ServerConfig = struct {
port: u16 = 3001,
@@ -33,7 +35,6 @@ pub const constants = struct {
const FallbackBufferPool = ObjectPool([16384]u8, null, false, 1024);
-const SocketList = HiveArray(Socket, constants.SOCKET_BACKLOG);
const IncomingRequest = struct {
http_request: HTTPRequest,
body_chunk: []const u8 = "",
@@ -41,20 +42,18 @@ const IncomingRequest = struct {
bytes: []u8,
pub fn freeData(this: *IncomingRequest, allocator: std.mem.Allocator) void {
- _ = this;
- _ = allocator;
- // if (this.bytes.len > 0)
- // allocator.free(this.bytes);
- // this.bytes.len = 0;
- // this.bytes.ptr = undefined;
- // this.body_chunk = "";
- // if (this.http_request.headers.len > 0)
- // allocator.free(this.http_request.headers);
- // this.http_request.headers.len = 0;
- // this.http_request.headers.ptr = undefined;
- }
-
- pub fn create(allocator: std.mem.Allocator, request_recv: []u8, fd: fd_t, request: HTTPRequest) !IncomingRequest {
+ if (this.bytes.len > 0)
+ allocator.free(this.bytes);
+ this.bytes.len = 0;
+ this.bytes.ptr = undefined;
+ this.body_chunk = "";
+ if (this.http_request.headers.len > 0)
+ allocator.free(this.http_request.headers);
+ this.http_request.headers.len = 0;
+ this.http_request.headers.ptr = undefined;
+ }
+
+ pub fn create(allocator: std.mem.Allocator, request_recv: []const u8, fd: fd_t, request: HTTPRequest) !IncomingRequest {
var body_chunk = request_recv[@minimum(request.bytes_read, request_recv.len)..];
var string_builder = StringBuilder{};
@@ -74,36 +73,9 @@ const IncomingRequest = struct {
const fd_t = std.os.fd_t;
-const Data = struct {
- value: Value = Value{ .empty = void{} },
- len: u16 = 0,
-
- pub const Value = union(enum) {
- recv_buffer: *RecvBuffer,
- fallback_buffer: *FallbackBufferPool.Node,
- empty: void,
- };
-
- pub fn read(this: Data) []u8 {
- return switch (this.value) {
- .recv_buffer => this.value.recv_buffer[0..this.len],
- .fallback_buffer => this.value.fallback_buffer.data[0..this.len],
- .empty => &.{},
- };
- }
-
- pub fn writable(this: Data) []u8 {
- return switch (this.value) {
- .recv_buffer => this.value.recv_buffer[this.len..],
- .fallback_buffer => this.value.fallback_buffer.data[this.len..],
- .empty => &.{},
- };
- }
-};
-
pub const RequestHandler = struct {
ctx: *anyopaque,
- onRequest: fn (ctx: *anyopaque, incoming: IncomingRequest) bool,
+ onRequest: fn (ctx: *anyopaque, conn: *Connection, incoming: IncomingRequest) bool,
pub fn New(comptime HandlerType: type, comptime Function: anytype) type {
return struct {
@@ -114,214 +86,35 @@ pub const RequestHandler = struct {
};
}
- pub fn onRequest(ctx: *anyopaque, incoming: IncomingRequest) bool {
+ pub fn onRequest(ctx: *anyopaque, conn: *Connection, incoming: IncomingRequest) bool {
if (@typeInfo(@TypeOf(Function)).Fn.return_type.? == void) {
- Function(@ptrCast(*HandlerType, @alignCast(@alignOf(HandlerType), ctx)), incoming);
+ Function(@ptrCast(*HandlerType, @alignCast(@alignOf(HandlerType), ctx)), conn, incoming);
return true;
}
- return Function(@ptrCast(*HandlerType, @alignCast(@alignOf(HandlerType), ctx)), incoming);
+ return Function(@ptrCast(*HandlerType, @alignCast(@alignOf(HandlerType), ctx)), conn, incoming);
}
};
}
};
-const recv_buffer_len = 4096;
-const RecvBuffer = [recv_buffer_len]u8;
-const RecvHiveArray = HiveArray(RecvBuffer, 128);
-
-pub fn sendStaticMessageConcurrent(toy: *ToyHTTPServer, fd: fd_t, message: []const u8) void {
- // const CompletionPoolBackup = ObjectPool(AsyncIO.Completion, null, false, 512);
-
- const doSendError = struct {
- pub fn send(
- this: *ToyHTTPServer,
- completion: *AsyncIO.Completion,
- result: AsyncIO.SendError!usize,
- ) void {
- // defer @fieldParentPtr(CompletionPoolBackup.Node, "data", completion).release();
-
- const amt = result catch |err| {
- if (err != error.EBADF)
- sendClose(completion.operation.send.socket);
- return;
- };
- if (completion.operation.send.disconnected) {
- sendClose(completion.operation.send.socket);
- return;
- }
- const remain = completion.operation.send.buf[0..completion.operation.send.len][amt..];
- if (remain.len == 0) {
- this.server.takeAsync(completion.operation.send.socket);
- return;
- }
-
- this.io.sendNow(*ToyHTTPServer, this, send, CompletionPool.get(), completion.operation.send.socket, remain, 0);
- }
- }.send;
- toy.io.sendNow(*ToyHTTPServer, toy, doSendError, CompletionPool.get(), fd, message, 0);
-}
-
-const CompletionPool = struct {
- pub fn get() *AsyncIO.Completion {
- return bun.default_allocator.create(AsyncIO.Completion) catch unreachable;
- }
-};
-
-pub fn sendStaticMessage(server: *Server, fd: fd_t, message: []const u8) void {
- const doSendError = struct {
- pub fn send(
- this: *Server,
- completion: *AsyncIO.Completion,
- result: AsyncIO.SendError!usize,
- ) void {
- var amt = result catch {
- sendClose(completion.operation.send.socket);
- return;
- };
- const remain = completion.operation.send.buf[0..completion.operation.send.len][amt..];
- if (completion.operation.send.disconnected) {
- sendClose(completion.operation.send.socket);
- return;
- }
- if (remain.len == 0) {
- this.take(completion.operation.send.socket);
- return;
- }
-
- AsyncIO.global.send(*Server, this, send, CompletionPool.get(), completion.operation.send.socket, remain, 0);
- }
- }.send;
- AsyncIO.global.send(*Server, server, doSendError, CompletionPool.get(), fd, message, 0);
-}
-
-pub fn sendStaticMessageWithoutClosing(server: *Server, fd: fd_t, message: []const u8) void {
- const doSendError = struct {
- pub fn send(
- this: *Server,
- completion: *AsyncIO.Completion,
- result: AsyncIO.SendError!usize,
- ) void {
- var amt = result catch {
- sendClose(completion.operation.send.socket);
- return;
- };
- if (completion.operation.send.disconnected) {
- sendClose(completion.operation.send.socket);
- return;
- }
-
- const remain = completion.operation.send.buf[0..completion.operation.send.len][amt..];
- if (remain.len == 0) {
- this.take(completion.operation.send.socket);
- return;
- }
-
- AsyncIO.global.send(*Server, this, send, CompletionPool.get(), completion.operation.send.socket, remain, 0);
- }
- }.send;
- AsyncIO.global.send(*Server, server, doSendError, CompletionPool.get(), fd, message, 0);
-}
-
pub const Server = struct {
- recv_buffer: RecvHiveArray = RecvHiveArray.init(),
- listener: fd_t,
- accept_completion: AsyncIO.Completion = undefined,
+ listener: *uWS.listen_socket_t,
+ ctx: *uWS.us_socket_context_t,
status: Status = Status.open,
- sockets: SocketList = SocketList.init(),
handler: RequestHandler,
- shutdown_completion: AsyncIO.Completion = undefined,
shutdown_requested: bool = false,
-
- pending_sockets_to_return: PendingSocketsList = PendingSocketsList.init(),
- pending_sockets_to_return_lock: Lock = Lock.init(),
- pending_socket_return_task: NetworkThread.Task = .{ .callback = flushPendingSocketsToReturn },
- pending_sockets_to_return_scheduled: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
-
- after_callback: AsyncIO.Callback = .{
- .callback = enqueueAcceptOpaque,
- .ctx = undefined,
- },
-
- has_pending_accept: bool = false,
-
- pub fn flushPendingSocketsToReturn(task: *NetworkThread.Task) void {
- var server: *Server = @fieldParentPtr(Server, "pending_socket_return_task", task);
- server.pending_sockets_to_return_scheduled.store(0, .Monotonic);
- server.pending_sockets_to_return_lock.lock();
- var sockets_slice = server.pending_sockets_to_return.readableSlice(0);
- var stack_fallback = std.heap.stackFallback(4096, bun.default_allocator);
- var allocator = stack_fallback.get();
- var list = allocator.dupe(u32, sockets_slice) catch unreachable;
- server.pending_sockets_to_return.head = 0;
- server.pending_sockets_to_return.count = 0;
- server.pending_sockets_to_return_lock.unlock();
-
- defer {
- if (!stack_fallback.fixed_buffer_allocator.ownsSlice(std.mem.sliceAsBytes(list))) {
- allocator.free(list);
- }
- }
-
- for (list) |fd| {
- if (comptime Environment.isMac) {
- std.os.getsockoptError(@intCast(c_int, fd)) catch {
- sendClose(@intCast(c_int, fd));
- continue;
- };
- }
-
- if (server.sockets.get()) |socket| {
- socket.* = .{
- .fd = @intCast(fd_t, fd),
- .server_ = server,
- };
-
- socket.enqueueRecv() catch unreachable;
- } else {
- sendClose(@intCast(fd_t, fd));
- }
- }
- }
+ loop: *uWS.Loop = undefined,
const PendingSocketsList = std.fifo.LinearFifo(u32, .{ .Static = constants.SOCKET_BACKLOG });
- pub fn takeAsync(this: *Server, socket: fd_t) void {
-
- // var err_code: i32 = undefined;
- // var size: u32 = @sizeOf(u32);
- // _ = std.c.getsockopt(@intCast(fd_t, socket), std.os.SOL.SOCKET, AsyncIO.darwin.SO_NREAD, @ptrCast([*]u8, &err_code), &size);
- // if (err_code == 0) {
- // std.os.shutdown(socket, .both) catch {};
- // std.os.close(socket);
- // }
- // return;
- // }
-
- this.pending_sockets_to_return_lock.lock();
- {
- this.pending_sockets_to_return.writeItemAssumeCapacity(@intCast(u32, socket));
- }
- if (this.pending_sockets_to_return_scheduled.fetchAdd(1, .Monotonic) == 0)
- NetworkThread.global.schedule(NetworkThread.Batch.from(&this.pending_socket_return_task));
- this.pending_sockets_to_return_lock.unlock();
- }
-
- pub fn take(server: *Server, fd: fd_t) void {
- if (server.sockets.get()) |socket| {
- socket.* = .{
- .fd = @intCast(fd_t, fd),
- .server_ = server,
- };
-
- socket.enqueueRecv() catch unreachable;
- } else {
- sendClose(@intCast(fd_t, fd));
- }
- }
-
pub fn quiet(this: *Server) void {
+ if (this.status != .open)
+ return;
+
this.status = .closing;
+ this.listener.close(false);
+ this.status = .closed;
}
pub const Status = enum {
@@ -336,23 +129,6 @@ pub const Server = struct {
this.shutdown_requested = true;
log("shutdown");
this.quiet();
- AsyncIO.global.nextTick(*Server, this, doShutdown, &this.shutdown_completion);
- }
-
- pub fn doShutdown(this: *Server, _: *AsyncIO.Completion, _: void) void {
- log("doShutdown");
- this.status = Status.closed;
- var iter = this.sockets.available.iterator(.{
- .kind = .unset,
- });
-
- while (iter.next()) |id| {
- this.sockets.buffer[id].reset();
- this.sockets.buffer[id].closeWithoutReset();
- }
-
- this.sockets = SocketList.init();
- this.recv_buffer = RecvHiveArray.init();
}
pub fn boot() void {}
@@ -361,138 +137,57 @@ pub const Server = struct {
log("start port: {d}", .{config.port});
var server = try bun.default_allocator.create(Server);
+ var ctx = server.createContext() orelse return error.OutOfMemory;
+ uWS.SocketTCP.configure(
+ ctx,
+ Connection,
+ Connection.onOpen,
+ Connection.onClose,
+ Connection.onData,
+ Connection.onWritable,
+ Connection.onTimeout,
+ Connection.onConnectError,
+ Connection.onEnd,
+ );
+
server.* = .{
- .listener = brk: {
- if (comptime Environment.isMac) {
- break :brk AsyncIO.createListenSocket(config.host, config.port, config.reuse_port);
- } else {
- break :brk try AsyncIO.openSocket(std.os.AF.INET, constants.OPEN_SOCKET_FLAGS | std.os.SOCK.STREAM, std.os.IPPROTO.TCP);
- }
- },
+ .listener = undefined,
+ .ctx = ctx,
.handler = handler,
+ .status = .open,
+ .loop = uWS.Loop.get().?,
};
- server.after_callback.ctx = server;
- {
- // var listener: std.x.net.tcp.Listener = .{
- // .socket = .{
- // .fd = server.listener,
- // },
- // };
- server.enqueueAccept();
- // listener.setKeepAlive(false) catch {};
- // try listener.bind(std.x.net.ip.Address.initIPv4(std.x.os.IPv4.unspecified, config.port));
- // try listener.listen(constants.SOCKET_BACKLOG);
+ if (uWS.SocketTCP.listen(config.host, config.port, ctx, *Server, server, "listener") == null) {
+ return error.ListenFailed;
}
- // try AsyncIO.global.on_after.append(bun.default_allocator, server.after_callback);
- return server;
- }
-
- pub fn enqueueAcceptOpaque(server: *anyopaque) void {
- enqueueAccept(@ptrCast(*Server, @alignCast(@alignOf(Server), server)));
- }
-
- pub fn enqueueAccept(server: *Server) void {
- AsyncIO.global.acceptNow(*Server, server, onAccept, &server.accept_completion, server.listener);
- }
-
- pub fn onAccept(
- this: *Server,
- complete: *AsyncIO.Completion,
- result_: AsyncIO.AcceptError!std.os.socket_t,
- ) void {
- var remain: usize = @as(usize, complete.operation.accept.backlog);
-
- var fd = result_ catch |err| {
- log("onAccept error: {s}", .{@errorName(err)});
- return;
+ server.* = .{
+ .listener = server.listener,
+ .ctx = ctx,
+ .handler = handler,
+ .status = .open,
+ .loop = uWS.Loop.get().?,
};
-
- _ = AsyncIO.Syscall.fcntl(fd, std.os.F.SETFL, (AsyncIO.Syscall.fcntl(fd, std.os.F.GETFL, 0) catch 0) | std.os.O.NONBLOCK | std.os.O.CLOEXEC) catch 0;
-
- if (this.handleAccept(fd)) {
- if (comptime Environment.isMac) {
- while (remain > 0) : (remain -= 1) {
- const sockfd = AsyncIO.darwin.@"accept$NOCANCEL"(this.listener, null, null);
- if (sockfd < 0) {
- break;
- }
- fd = sockfd;
-
- AsyncIO.Syscall.setsockopt(fd, std.os.SOL.SOCKET, std.os.SO.NOSIGPIPE, &std.mem.toBytes(@as(c_int, 1))) catch {};
- _ = AsyncIO.Syscall.fcntl(fd, std.os.F.SETFL, (AsyncIO.Syscall.fcntl(fd, std.os.F.GETFL, 0) catch 0) | std.os.O.NONBLOCK | std.os.O.CLOEXEC) catch 0;
- // _ = AsyncIO.Syscall.fcntl(fd, std.os.FD_CLOEXEC, 1) catch 0;
-
- if (!this.handleAccept(fd))
- break;
- }
- }
- }
+ return server;
}
-
- fn handleAccept(this: *Server, fd: std.os.socket_t) bool {
- if (this.status == .closing or this.status == .closed) {
- log("onAccept closing fd: {d} because not accepting connections", .{fd});
- std.os.close(fd);
- return false;
- }
-
- var socket = this.sockets.get() orelse {
- log("onAccept closing fd: {d} because no sockets available", .{fd});
- std.os.close(fd);
- return false;
- };
-
- socket.* = .{
- .fd = fd,
- .server_ = this,
- };
-
- socket.enqueueRecv() catch {
- log("onAccept closing fd: {d} because enqueueRecv failed", .{fd});
- std.os.close(fd);
- std.debug.assert(this.sockets.put(socket));
- };
- return true;
+ pub fn createContext(server: *Server) ?*uWS.us_socket_context_t {
+ var loop = uWS.Loop.get().?;
+ var ctx = uWS.us_create_socket_context(0, loop, @sizeOf(*Server), .{}) orelse return null;
+ var ptr = @ptrCast(**Server, @alignCast(@alignOf(*Server), uWS.us_socket_context_ext(0, ctx).?));
+ ptr.* = server;
+ return ctx;
}
- pub fn dispatch(this: *Server, socket: *Socket, request: HTTPRequest) void {
- var incoming_request = IncomingRequest.create(bun.default_allocator, socket.data.read(), socket.fd, request) catch {
- log("Dropping request due to OOM!", .{});
- socket.reset();
- return;
- };
-
- // Reset the data before calling the handler to free up memory for the next request.
- socket.reset();
- std.debug.assert(this.sockets.put(socket));
-
- if (!this.handler.onRequest(this.handler.ctx, incoming_request)) {
- log("Dropping request due to handler failure!", .{});
+ pub fn dispatch(this: *Server, connection: *Connection, incoming_request: IncomingRequest) void {
+ if (this.handler.onRequest(this.handler.ctx, connection, incoming_request)) {
return;
}
+ _ = connection.socket.write(bad_request, false);
+ connection.socket.close(0, null);
}
};
-fn sendClose(fd: fd_t) void {
- std.os.getsockoptError(fd) catch {};
- std.os.shutdown(fd, std.os.ShutdownHow.both) catch {};
-
- if (comptime Environment.isLinux) {
- const Closer = struct {
- pub fn onClose(_: void, completion: *AsyncIO.Completion, _: AsyncIO.CloseError!void) void {
- var node = @fieldParentPtr(CompletionPool.Node, "data", completion);
- node.releaase();
- }
- };
-
- AsyncIO.global.close(void, void{}, Closer.onClose, CompletionPool.get(), fd);
- } else {
- std.os.close(fd);
- }
-}
-
const CompletionSwapper = struct {
first: AsyncIO.Completion = undefined,
second: AsyncIO.Completion = undefined,
@@ -549,194 +244,96 @@ const hello_world = "HTTP/1.1 200 OK" ++
CRLF ++ CRLF ++
"Hello, world!";
-pub const Socket = struct {
- recv_completion: CompletionSwapper = CompletionSwapper{},
- fd: fd_t,
- data: Data = .{},
- server_: *Server,
-
- pub fn reset(this: *Socket) void {
- switch (this.data.value) {
- .recv_buffer => |buf| {
- std.debug.assert(this.server().recv_buffer.put(buf));
- this.data = .{ .value = .{ .empty = void{} } };
- },
- .fallback_buffer => |buf| {
- buf.release();
- this.data = .{ .value = .{ .empty = void{} } };
- },
- .empty => {},
- }
- this.recv_completion = CompletionSwapper{};
- }
-
- pub fn consume(this: *Socket, buf: []u8) !void {
- var writable = this.data.writable();
- if (buf.ptr == writable.ptr and writable.len >= buf.len) {
- this.data.len += @truncate(u16, buf.len);
- return;
- } else if (writable.len >= buf.len) {
- @memcpy(writable.ptr, buf.ptr, buf.len);
- this.data.len += @truncate(u16, buf.len);
- return;
- }
- const start_len = this.data.len;
-
- switch (this.data.value) {
- .recv_buffer => |recv| {
- var fallback = FallbackBufferPool.get(bun.default_allocator);
- @memcpy(&fallback.data, recv, start_len);
- std.debug.assert(this.server().recv_buffer.put(recv));
- @memcpy(fallback.data[start_len..].ptr, buf.ptr, buf.len);
- this.data = .{ .value = .{ .fallback_buffer = fallback }, .len = @truncate(u16, buf.len + start_len) };
- },
- .fallback_buffer => {
- return error.TooBig;
- },
- .empty => {
- if (buf.len <= recv_buffer_len) {
- if (this.server().recv_buffer.get()) |recv| {
- @memcpy(recv, buf.ptr, buf.len);
- this.data = .{ .value = .{ .recv_buffer = recv }, .len = @truncate(u16, buf.len) };
- return;
- }
- }
-
- if (buf.len <= 16384) {
- var fallback = FallbackBufferPool.get(bun.default_allocator);
- @memcpy(&fallback.data, buf.ptr, buf.len);
- this.data = .{ .value = .{ .fallback_buffer = fallback }, .len = @truncate(u16, buf.len) };
- }
-
- return error.TooBig;
- },
- }
- }
-
- pub fn cancelTimeout(this: *Socket) void {
- _ = this;
- }
-
- fn getNextBuffer(this: *Socket) []u8 {
- var next_buffer: []u8 = this.data.writable();
-
- if (next_buffer.len < 512) {
- var buf = this.data.read();
- if (buf.len == 0) {
- if (this.server().recv_buffer.get()) |recv| {
- this.data = .{ .value = .{ .recv_buffer = recv }, .len = @truncate(u16, buf.len) };
- return this.data.writable();
- }
- }
-
- if (this.data.value == .recv_buffer) {
- var fallback = FallbackBufferPool.get(bun.default_allocator);
- @memcpy(&fallback.data, buf.ptr, buf.len);
- this.data = .{ .value = .{ .fallback_buffer = fallback }, .len = @truncate(u16, buf.len) };
- return this.data.writable();
- }
+pub const Connection = struct {
+ socket: uWS.SocketTCP,
+ incoming_request: IncomingRequest = undefined,
+ is_writable: bool = false,
+ has_received: bool = false,
+ has_incoming_request: bool = false,
- if (this.data.value == .empty) {
- var fallback = FallbackBufferPool.get(bun.default_allocator);
- this.data = .{ .value = .{ .fallback_buffer = fallback }, .len = 0 };
- return this.data.writable();
- }
- }
-
- return next_buffer;
+ pub fn onOpen(this: *Connection, socket: uWS.SocketTCP) void {
+ this.socket = socket;
+ socket.timeout(30);
+ this.is_writable = false;
+ log("Client connected", .{});
}
- pub fn enqueueRecv(this: *Socket) !void {
- this.setTimeout();
-
- var next_buffer = this.getNextBuffer();
- if (next_buffer.len == 0) {
- return error.TooBig;
- }
-
- AsyncIO.global.recv(
- *Socket,
- this,
- Socket.onRecv,
- CompletionPool.get(),
- this.fd,
- next_buffer,
- );
+ fn dispatch(this: *Connection, incoming_request: IncomingRequest) void {
+ this.has_received = false;
+ this.is_writable = false;
+ this.server().dispatch(this, incoming_request);
+ return;
}
- pub fn close(this: *Socket) void {
- this.reset();
+ pub fn onClose(this: *Connection, socket: uWS.SocketTCP, _: c_int, _: ?*anyopaque) void {
+ _ = this;
+ _ = socket;
- this.closeWithoutReset();
- std.debug.assert(this.server().sockets.put(this));
+ log("Client disconnected", .{});
}
- pub fn closeWithoutReset(this: *Socket) void {
- const fd = this.fd;
- std.debug.assert(fd > 0);
- this.fd = 0;
+ pub fn onWritable(this: *Connection, socket: uWS.SocketTCP) void {
+ _ = this;
+ _ = socket;
- sendClose(fd);
+ this.is_writable = true;
}
- pub fn onRecv(
- this: *Socket,
- completion: *AsyncIO.Completion,
- read_: AsyncIO.RecvError!usize,
- ) void {
- const read = read_ catch |err| {
- log("onRecv error: {s}", .{@errorName(err)});
- this.close();
- return;
- };
-
- if (read == 0) {
- log("onRecv disconnected socket", .{});
- this.close();
- return;
- }
-
- this.consume(completion.operation.recv.buf[0..read]) catch |err| {
- switch (err) {
- error.TooBig => {
- log("onRecv TooBig", .{});
- this.reset();
- sendStaticMessage(this.server(), this.fd, request_header_fields_too_large);
-
- return;
- },
- }
- };
+ pub fn onData(this: *Connection, socket: uWS.SocketTCP, data: []const u8) void {
+ _ = this;
+ _ = socket;
+ _ = data;
+ socket.timeout(30);
var headers: [512]picohttp.Header = undefined;
- const request = HTTPRequest.parse(this.data.read(), &headers) catch |err| {
+ const request = HTTPRequest.parse(data, &headers) catch |err| {
switch (err) {
error.BadRequest => {
log("onRecv bad request", .{});
- this.reset();
- sendStaticMessage(this.server(), this.fd, bad_request);
-
+ this.socket.close(0, null);
return;
},
error.ShortRead => {
- this.enqueueRecv() catch {
- log("onRecv TooBig (on enqueue)", .{});
- this.reset();
- sendStaticMessage(this.server(), this.fd, request_header_fields_too_large);
- };
return;
},
}
};
- log("onRecv request: {any}", .{request});
- this.cancelTimeout();
- this.server().dispatch(this, request);
+
+ const fd = @intCast(fd_t, @ptrToInt(socket.handle().?));
+ // if (this.has_incoming_request) {
+ // this.incoming_request.freeData(bun.default_allocator);
+ // }
+ this.has_received = true;
+ this.has_incoming_request = true;
+ this.dispatch(IncomingRequest.create(bun.default_allocator, data, fd, request) catch {
+ log("Dropping request due to OOM!", .{});
+ this.socket.close(0, null);
+ return;
+ });
+ }
+
+ pub fn onTimeout(this: *Connection, socket: uWS.SocketTCP) void {
+ _ = this;
+ _ = socket;
+ socket.close(0, null);
+ }
+
+ pub fn onConnectError(this: *Connection, socket: uWS.SocketTCP, code: c_int) void {
+ _ = this;
+ _ = socket;
+ _ = code;
}
- pub fn setTimeout(_: *Socket) void {}
+ pub fn onEnd(this: *Connection, socket: uWS.SocketTCP) void {
+ _ = this;
+ _ = socket;
+
+ socket.shutdown();
+ socket.close(0, null);
+ }
- pub fn server(this: *Socket) *Server {
- return this.server_;
+ pub inline fn server(this: Connection) *Server {
+ return @ptrCast(**Server, @alignCast(@alignOf(*Server), uWS.us_socket_context_ext(0, this.socket.context())).?).*;
}
};
@@ -745,128 +342,274 @@ const NetworkThread = @import("./network_thread.zig");
pub const ToySingleThreadedHTTPServer = struct {
pub const Handler = RequestHandler.New(ToySingleThreadedHTTPServer, onRequest);
server: *Server,
- task: NetworkThread.Task = .{ .callback = startServer },
pub fn onRequest(
this: *ToySingleThreadedHTTPServer,
- incoming: IncomingRequest,
- ) void {
- log("onRequest: {any}", .{incoming});
- sendStaticMessageWithoutClosing(this.server, incoming.fd, hello_world);
- var inc = incoming;
- inc.freeData(bun.default_allocator);
- }
-
- pub fn drain(_: *ToySingleThreadedHTTPServer) void {}
-
- pub fn loop(this: *ToySingleThreadedHTTPServer) void {
- this.drain();
+ connection: *Connection,
+ _: IncomingRequest,
+ ) bool {
+ _ = this;
- while (true) {
- AsyncIO.global.wait(this, drain);
+ const wrote = connection.socket.write(hello_world, true);
+ if (wrote < hello_world.len) {
+ log("onRequest: write failed", .{});
+ connection.socket.close(0, null);
+ return false;
}
+
+ // incoming.freeData(bun.default_allocator);
+ return true;
}
- pub fn startServer(task: *NetworkThread.Task) void {
+ pub fn startServer(toy: *ToySingleThreadedHTTPServer) void {
var toy_config = ServerConfig{
.port = std.fmt.parseInt(u16, std.os.getenv("PORT") orelse "3001", 10) catch 3001,
};
defer Output.prettyln("Server started on port {d}", .{toy_config.port});
defer Output.flush();
- var toy = @fieldParentPtr(ToySingleThreadedHTTPServer, "task", task);
toy.server = Server.start(toy_config, RequestHandler.New(ToySingleThreadedHTTPServer, onRequest).init(toy)) catch unreachable;
+ toy.server.loop.run();
}
pub fn main() anyerror!void {
var http = try bun.default_allocator.create(ToySingleThreadedHTTPServer);
+ http.* = .{ .server = undefined };
var stdout_ = std.io.getStdOut();
var stderr_ = std.io.getStdErr();
var output_source = Output.Source.init(stdout_, stderr_);
Output.Source.set(&output_source);
+ _ = try adjustUlimit();
defer Output.flush();
- try NetworkThread.init();
- http.* = .{
- .server = undefined,
- };
- NetworkThread.global.schedule(NetworkThread.Batch.from(&http.task));
- while (true) {
- std.time.sleep(std.time.ns_per_hour);
- }
+ startServer(http);
}
};
pub const ToyHTTPServer = struct {
pub const Handler = RequestHandler.New(*ToyHTTPServer, onRequest);
- const Fifo = std.fifo.LinearFifo(IncomingRequest, .Dynamic);
+ const Fifo = std.fifo.LinearFifo(IncomingRequest, .{ .Static = 4096 });
server: *Server,
- pending: Fifo,
+ first_list: std.BoundedArray(IncomingRequest, 8096) = std.BoundedArray(IncomingRequest, 8096).init(0) catch unreachable,
+ second_list: std.BoundedArray(IncomingRequest, 8096) = std.BoundedArray(IncomingRequest, 8096).init(0) catch unreachable,
+ first_is_active: bool = true,
+ incoming_list: *std.BoundedArray(IncomingRequest, 8096) = undefined,
+ outgoing_list: std.atomic.Atomic(*std.BoundedArray(IncomingRequest, 8096)) = undefined,
active: Fifo,
lock: Lock = Lock.init(),
- io: AsyncIO,
- task: NetworkThread.Task = .{ .callback = startServer },
+ loop: *uWS.Loop,
+ has_scheduled: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
+ ctx: *uWS.us_socket_context_t = undefined,
+ waker: AsyncIO.Waker = undefined,
+ // active_requests: HiveArray(WritableSocket, 1024) = HiveArray(WritableSocket, 1024).init(),
pub fn onRequest(
this: *ToyHTTPServer,
+ _: *Connection,
incoming: IncomingRequest,
- ) void {
+ ) bool {
{
this.lock.lock();
- this.pending.writeItem(incoming) catch unreachable;
defer this.lock.unlock();
+ this.outgoing_list.loadUnchecked().appendAssumeCapacity(incoming);
}
- this.io.waker.wake() catch unreachable;
+
+ if (this.outgoing_list.loadUnchecked().len > 20)
+ this.waker.wake() catch unreachable;
+
+ return true;
}
pub fn drain(this: *ToyHTTPServer) void {
- const all = this.pending.readableSlice(0);
- this.active.write(all) catch unreachable;
- this.pending.count = 0;
- this.pending.head = 0;
- }
+ var ctx = this.ctx;
- pub fn loop(this: *ToyHTTPServer) void {
- this.drain();
+ {
+ this.lock.lock();
+ defer {
+ this.lock.unlock();
+ }
- while (true) {
- while (this.active.readItem()) |incoming| {
- // defer incoming.freeData(bun.default_allocator);
- sendStaticMessageConcurrent(this, incoming.fd, hello_world);
+ if (this.first_is_active) {
+ this.first_is_active = false;
+ this.second_list.len = 0;
+
+ this.incoming_list = this.outgoing_list.value;
+ this.outgoing_list.value = &this.second_list;
+ } else {
+ this.first_is_active = true;
+ this.first_list.len = 0;
+
+ this.incoming_list = this.outgoing_list.value;
+ this.outgoing_list.value = &this.first_list;
}
+ }
- this.io.wait(this, drain);
+ const slice = this.incoming_list.slice();
+ for (slice) |incoming| {
+ const sent = AsyncIO.darwin.@"sendto"(incoming.fd, hello_world, hello_world.len, 0, null, 0);
+ if (sent < hello_world.len) {
+ var socket = uWS.SocketTCP.attach(incoming.fd, ctx) orelse continue;
+ _ = socket.write(hello_world, true);
+ }
}
+ this.incoming_list.len = 0;
}
- pub fn startServer(task: *NetworkThread.Task) void {
+ // pub fn dispatch(this: *ToyHTTPServer, socket: *WritableSocket, _: IncomingRequest) void {
+ // this.server.takeAsync(socket.socket.detach());
+ // }
+
+ pub const WritableSocket = struct {
+ socket: uWS.SocketTCP,
+ incoming_request: IncomingRequest = undefined,
+ is_writable: bool = false,
+ has_received: bool = false,
+ has_incoming_request: bool = false,
+
+ pub fn onOpen(_: *WritableSocket, _: uWS.SocketTCP) void {
+ // this.socket = socket;
+ // socket.timeout(30);
+ // this.is_writable = false;
+ // log("Client connected", .{});
+ }
+
+ pub fn dispatch(this: *WritableSocket) void {
+ this.has_received = false;
+ this.is_writable = false;
+ // this.server().dispatch(this, this.incoming_request);
+ return;
+ }
+
+ pub fn onClose(this: *WritableSocket, socket: uWS.SocketTCP, _: c_int, _: ?*anyopaque) void {
+ _ = this;
+ _ = socket;
+
+ log("Client disconnected", .{});
+ }
+
+ pub fn onWritable(this: *WritableSocket, socket: uWS.SocketTCP) void {
+ _ = this;
+ _ = socket;
+
+ this.is_writable = true;
+ }
+
+ pub fn onData(this: *WritableSocket, socket: uWS.SocketTCP, data: []const u8) void {
+ _ = this;
+ _ = socket;
+ _ = data;
+ // socket.timeout(30);
+
+ // var headers: [512]picohttp.Header = undefined;
+ // const request = HTTPRequest.parse(data, &headers) catch |err| {
+ // switch (err) {
+ // error.BadRequest => {
+ // log("onRecv bad request", .{});
+ // this.socket.close(0, null);
+ // return;
+ // },
+ // error.ShortRead => {
+ // return;
+ // },
+ // }
+ // };
+
+ // const fd = @intCast(fd_t, @ptrToInt(socket.handle().?));
+ // if (this.has_incoming_request) {
+ // this.incoming_request.freeData(bun.default_allocator);
+ // }
+ // this.incoming_request = IncomingRequest.create(bun.default_allocator, data, fd, request) catch {
+ // log("Dropping request due to OOM!", .{});
+ // this.socket.close(0, null);
+ // return;
+ // };
+ // this.has_received = true;
+ // this.has_incoming_request = true;
+ // this.dispatch();
+ }
+
+ pub fn onTimeout(this: *WritableSocket, socket: uWS.SocketTCP) void {
+ _ = this;
+ _ = socket;
+ socket.close(0, null);
+ }
+
+ pub fn onConnectError(this: *WritableSocket, socket: uWS.SocketTCP, code: c_int) void {
+ _ = this;
+ _ = socket;
+ _ = code;
+ }
+
+ pub fn onEnd(this: *WritableSocket, socket: uWS.SocketTCP) void {
+ _ = this;
+ _ = socket;
+
+ socket.shutdown();
+ socket.close(0, null);
+ }
+
+ pub inline fn server(this: WritableSocket) *ToyHTTPServer {
+ return @ptrCast(**ToyHTTPServer, @alignCast(@alignOf(*ToyHTTPServer), uWS.us_socket_context_ext(0, this.socket.context())).?).*;
+ }
+ };
+
+ fn scheduleWakeup(this: *ToyHTTPServer) void {
+ if (this.has_scheduled.load(.Monotonic) == 0) return;
+
+ this.waker.wake() catch unreachable;
+ }
+ pub fn startServer(toy: *ToyHTTPServer) void {
+ Output.Source.configureNamedThread("ToyHTTPServer");
var toy_config = ServerConfig{
.port = std.fmt.parseInt(u16, std.os.getenv("PORT") orelse "3001", 10) catch 3001,
};
defer Output.prettyln("Server started on port {d}", .{toy_config.port});
defer Output.flush();
- var toy = @fieldParentPtr(ToyHTTPServer, "task", task);
toy.server = Server.start(toy_config, RequestHandler.New(ToyHTTPServer, onRequest).init(toy)) catch unreachable;
+ _ = toy.server.loop.addPostHandler(*ToyHTTPServer, toy, scheduleWakeup);
+ toy.server.loop.run();
}
pub fn main() anyerror!void {
var http = try bun.default_allocator.create(ToyHTTPServer);
-
var stdout_ = std.io.getStdOut();
var stderr_ = std.io.getStdErr();
var output_source = Output.Source.init(stdout_, stderr_);
+ _ = try adjustUlimit();
Output.Source.set(&output_source);
defer Output.flush();
- try NetworkThread.init();
http.* = .{
- .pending = Fifo.init(bun.default_allocator),
- .active = Fifo.init(bun.default_allocator),
- .io = try AsyncIO.init(1024, 0, try AsyncIO.Waker.init(bun.default_allocator)),
+ .active = Fifo.init(),
.server = undefined,
+ .loop = uWS.Loop.get().?,
+ .waker = AsyncIO.Waker.init(bun.default_allocator) catch unreachable,
};
- NetworkThread.global.schedule(NetworkThread.Batch.from(&http.task));
- http.loop();
+ http.incoming_list = &http.first_list;
+ http.outgoing_list.value = &http.second_list;
+
+ http.ctx = uWS.us_create_socket_context(0, http.loop, 8, .{}).?;
+ uWS.SocketTCP.configure(
+ http.ctx,
+ WritableSocket,
+ WritableSocket.onOpen,
+ WritableSocket.onClose,
+ WritableSocket.onData,
+ WritableSocket.onWritable,
+ WritableSocket.onTimeout,
+ WritableSocket.onConnectError,
+ WritableSocket.onEnd,
+ );
+
+ @ptrCast(**anyopaque, @alignCast(@alignOf(*anyopaque), uWS.us_socket_context_ext(0, http.ctx).?)).* = @ptrCast(*anyopaque, &http);
+ _ = http.loop.addPostHandler(*ToyHTTPServer, http, drain);
+ var thread = std.Thread.spawn(.{}, startServer, .{http}) catch unreachable;
+ http.drain();
+ thread.detach();
+ while (true) {
+ _ = http.waker.wait() catch 0;
+ http.drain();
+ }
}
};
diff --git a/src/io/io_linux.zig b/src/io/io_linux.zig
index af5382053..ccefa4a1b 100644
--- a/src/io/io_linux.zig
+++ b/src/io/io_linux.zig
@@ -995,7 +995,7 @@ pub const Waker = struct {
pub fn wait(this: Waker) !u64 {
var bytes: usize = 0;
- _ = std.os.read(this.fd, @ptrCast(*[8]u8, &bytes)) catch 0;
+ _ = try std.os.read(this.fd, @ptrCast(*[8]u8, &bytes));
return @intCast(u64, bytes);
}