diff options
author | 2022-03-21 21:31:47 -0700 | |
---|---|---|
committer | 2022-03-21 21:31:47 -0700 | |
commit | b3bd413c3baff757d41fefded36f8f18c90a52d3 (patch) | |
tree | 932ec9e6471ab4f2ff9baa704e4b733679c173f5 /src | |
parent | 7cd93e667059d4de5250d46ec109b696cd951603 (diff) | |
download | bun-b3bd413c3baff757d41fefded36f8f18c90a52d3.tar.gz bun-b3bd413c3baff757d41fefded36f8f18c90a52d3.tar.zst bun-b3bd413c3baff757d41fefded36f8f18c90a52d3.zip |
sendfile works
Diffstat (limited to 'src')
-rw-r--r-- | src/deps/_libusockets.h | 2 | ||||
-rw-r--r-- | src/deps/libuwsockets.cpp | 21 | ||||
m--------- | src/deps/uws | 0 | ||||
-rw-r--r-- | src/deps/uws.zig | 65 | ||||
-rw-r--r-- | src/javascript/jsc/api/server.zig | 90 |
5 files changed, 136 insertions, 42 deletions
diff --git a/src/deps/_libusockets.h b/src/deps/_libusockets.h index 8c65d36ab..3b8c61f74 100644 --- a/src/deps/_libusockets.h +++ b/src/deps/_libusockets.h @@ -293,6 +293,8 @@ void uws_res_write_headers(int ssl, uws_res_t *res, const StringPointer *names, void *uws_res_get_native_handle(int ssl, uws_res_t *res); void uws_res_uncork(int ssl, uws_res_t *res); +void uws_res_set_write_offset(int ssl, uws_res_t *res, size_t off); +void us_socket_mark_needs_more_not_ssl(uws_res_t *res); #ifdef __cplusplus } #endif diff --git a/src/deps/libuwsockets.cpp b/src/deps/libuwsockets.cpp index 628d30df5..39e0e27fc 100644 --- a/src/deps/libuwsockets.cpp +++ b/src/deps/libuwsockets.cpp @@ -1,6 +1,7 @@ #include "_libusockets.h" #include <string_view> #include <uws/src/App.h> +#include <uws/uSockets/src/internal/internal.h> extern "C" { @@ -794,9 +795,12 @@ void uws_res_write_header_int(int ssl, uws_res_t *res, const char *key, void uws_res_end_without_body(int ssl, uws_res_t *res) { if (ssl) { uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res; + uwsRes->setWriteOffset(0); uwsRes->endWithoutBody(0); } else { + uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res; + uwsRes->setWriteOffset(0); uwsRes->endWithoutBody(0); } } @@ -1011,6 +1015,23 @@ void uws_res_uncork(int ssl, uws_res_t *res) { // } } +void us_socket_mark_needs_more_not_ssl(uws_res_t *res) { + us_socket_t *s = (us_socket_t *)res; + s->context->loop->data.last_write_failed = 1; + us_poll_change(&s->p, s->context->loop, + LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE); +} + +void uws_res_set_write_offset(int ssl, uws_res_t *res, size_t off) { + if (ssl) { + uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res; + uwsRes->setWriteOffset(off); + } else { + uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res; + uwsRes->setWriteOffset(off); + } +} + void uws_res_cork(int ssl, uws_res_t *res, void *ctx, void (*corker)(void *ctx)) { if (ssl) { diff --git a/src/deps/uws b/src/deps/uws -Subproject 86367b941119013e1f41a22b85d0bddcd1261d9 +Subproject ccf3f9dd60726be5977145f151a2c30eb7b489d diff --git a/src/deps/uws.zig b/src/deps/uws.zig index c72fefd71..2f444e4ab 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -63,7 +63,7 @@ extern fn us_socket_context_free(ssl: c_int, context: ?*us_socket_context_t) voi extern fn us_socket_context_on_open(ssl: c_int, context: ?*us_socket_context_t, on_open: ?fn (?*us_socket_t, c_int, [*c]u8, c_int) callconv(.C) ?*us_socket_t) void; extern fn us_socket_context_on_close(ssl: c_int, context: ?*us_socket_context_t, on_close: ?fn (?*us_socket_t, c_int, ?*anyopaque) callconv(.C) ?*us_socket_t) void; extern fn us_socket_context_on_data(ssl: c_int, context: ?*us_socket_context_t, on_data: ?fn (?*us_socket_t, [*c]u8, c_int) callconv(.C) ?*us_socket_t) void; -extern fn us_socket_context_on_writable(ssl: c_int, context: ?*us_socket_context_t, on_writable: ?fn (?*us_socket_t) callconv(.C) ?*us_socket_t) void; +extern fn us_socket_context_on_writable(ssl: c_int, context: ?*us_socket_context_t, on_writable: ?fn (*us_socket_t) callconv(.C) ?*us_socket_t) void; extern fn us_socket_context_on_timeout(ssl: c_int, context: ?*us_socket_context_t, on_timeout: ?fn (?*us_socket_t) callconv(.C) ?*us_socket_t) void; extern fn us_socket_context_on_connect_error(ssl: c_int, context: ?*us_socket_context_t, on_connect_error: ?fn (?*us_socket_t, c_int) callconv(.C) ?*us_socket_t) void; extern fn us_socket_context_on_end(ssl: c_int, context: ?*us_socket_context_t, on_end: ?fn (?*us_socket_t) callconv(.C) ?*us_socket_t) void; @@ -79,7 +79,7 @@ extern fn us_socket_context_adopt_socket(ssl: c_int, context: ?*us_socket_contex extern fn us_create_child_socket_context(ssl: c_int, context: ?*us_socket_context_t, context_ext_size: c_int) ?*us_socket_context_t; pub const Poll = opaque { - extern fn us_create_poll(loop: ?*Loop, fallthrough: c_int, ext_size: c_uint) ?*Poll; + extern fn us_create_poll(loop: ?*Loop, fallthrough: c_int, ext_size: c_uint) *Poll; extern fn us_poll_free(p: ?*Poll, loop: ?*Loop) void; extern fn us_poll_init(p: ?*Poll, fd: c_int, poll_type: c_int) void; extern fn us_poll_start(p: ?*Poll, loop: ?*Loop, events: c_int) void; @@ -410,6 +410,7 @@ pub fn NewApp(comptime ssl: bool) type { pub fn end(res: *Response, data: []const u8, close_connection: bool) void { uws_res_end(ssl_flag, res.downcast(), data.ptr, data.len, close_connection); } + pub fn uncork(_: *Response) void { // uws_res_uncork( // ssl_flag, @@ -443,6 +444,9 @@ pub fn NewApp(comptime ssl: bool) type { pub fn getWriteOffset(res: *Response) uintmax_t { return uws_res_get_write_offset(ssl_flag, res.downcast()); } + pub fn setWriteOffset(res: *Response, offset: anytype) void { + uws_res_set_write_offset(ssl_flag, res.downcast(), @intCast(uintmax_t, offset)); + } pub fn hasResponded(res: *Response) bool { return uws_res_has_responded(ssl_flag, res.downcast()); } @@ -470,6 +474,9 @@ pub fn NewApp(comptime ssl: bool) type { } }; uws_res_on_writable(ssl_flag, res.downcast(), Wrapper.handle, user_data); + if (!ssl) { + us_socket_mark_needs_more_not_ssl(res.downcast()); + } } pub fn onAborted(res: *Response, comptime UserDataType: type, comptime handler: fn (UserDataType, *Response) void, opcional_data: UserDataType) void { const Wrapper = struct { @@ -536,6 +543,57 @@ pub fn NewApp(comptime ssl: bool) type { uws_res_cork(ssl_flag, res.downcast(), opcional_data, Wrapper.handle); } + // pub fn onSocketWritable( + // res: *Response, + // comptime UserDataType: type, + // comptime handler: fn (UserDataType, fd: i32) void, + // opcional_data: UserDataType, + // ) void { + // const Wrapper = struct { + // pub fn handle(user_data: ?*anyopaque, fd: i32) callconv(.C) void { + // if (comptime UserDataType == void) { + // @call(.{ .modifier = .always_inline }, handler, .{ + // void{}, + // fd, + // }); + // } else { + // @call(.{ .modifier = .always_inline }, handler, .{ + // @ptrCast( + // UserDataType, + // @alignCast(@alignOf(UserDataType), user_data.?), + // ), + // fd, + // }); + // } + // } + // }; + + // const OnWritable = struct { + // pub fn handle(socket: *us_socket_t) callconv(.C) ?*us_socket_t { + // if (comptime UserDataType == void) { + // @call(.{ .modifier = .always_inline }, handler, .{ + // void{}, + // fd, + // }); + // } else { + // @call(.{ .modifier = .always_inline }, handler, .{ + // @ptrCast( + // UserDataType, + // @alignCast(@alignOf(UserDataType), user_data.?), + // ), + // fd, + // }); + // } + + // return socket; + // } + // }; + + // var socket_ctx = us_socket_context(ssl_flag, uws_res_get_native_handle(ssl_flag, res)).?; + // var child = us_create_child_socket_context(ssl_flag, socket_ctx, 8); + + // } + pub fn writeHeaders( res: *Response, names: []const Api.StringPointer, @@ -605,6 +663,7 @@ extern fn uws_res_write_header_int(ssl: c_int, res: *uws_res, key: [*c]const u8, extern fn uws_res_end_without_body(ssl: c_int, res: *uws_res) void; extern fn uws_res_write(ssl: c_int, res: *uws_res, data: [*c]const u8, length: usize) bool; extern fn uws_res_get_write_offset(ssl: c_int, res: *uws_res) uintmax_t; +extern fn uws_res_set_write_offset(ssl: c_int, res: *uws_res, uintmax_t) void; extern fn uws_res_has_responded(ssl: c_int, res: *uws_res) bool; extern fn uws_res_on_writable(ssl: c_int, res: *uws_res, handler: ?fn (*uws_res, uintmax_t, ?*anyopaque) callconv(.C) bool, user_data: ?*anyopaque) void; extern fn uws_res_on_aborted(ssl: c_int, res: *uws_res, handler: ?fn (*uws_res, ?*anyopaque) callconv(.C) void, opcional_data: ?*anyopaque) void; @@ -673,3 +732,5 @@ pub const uws_app_listen_config_t = extern struct { host: [*c]const u8 = null, options: c_int, }; + +extern fn us_socket_mark_needs_more_not_ssl(socket: ?*uws_res) void; diff --git a/src/javascript/jsc/api/server.zig b/src/javascript/jsc/api/server.zig index 7e2520b99..125d688a7 100644 --- a/src/javascript/jsc/api/server.zig +++ b/src/javascript/jsc/api/server.zig @@ -81,8 +81,10 @@ const uws = @import("uws"); const SendfileContext = struct { fd: i32, + socket_fd: i32 = 0, remain: u32 = 0, offset: i64 = 0, + has_listener: bool = false, }; pub fn NewServer(comptime ssl_enabled: bool) type { @@ -262,62 +264,84 @@ pub fn NewServer(comptime ssl_enabled: bool) type { this.sendfile = undefined; this.finalize(); } - - pub fn onSendfile(this: *RequestContext, amount_: c_ulong, response: *App.Response) callconv(.C) bool { - const amount = @minimum(@truncate(u32, amount_), this.sendfile.remain); - - if (amount == 0 or this.aborted) { - this.cleanupAfterSendfile(); - return true; - } - - const adjusted_count_temporary = @minimum(amount, @as(u63, std.math.maxInt(i32))); + const separator: string = "\r\n"; + const separator_iovec = [1]std.os.iovec_const{.{ + .iov_base = separator.ptr, + .iov_len = separator.len, + }}; + + pub fn onSendfile(this: *RequestContext) bool { + const adjusted_count_temporary = @minimum(@as(u63, this.sendfile.remain), @as(u63, std.math.maxInt(i32))); // TODO we should not need this int cast; improve the return type of `@minimum` const adjusted_count = @intCast(u63, adjusted_count_temporary); var sbytes: std.os.off_t = adjusted_count; const signed_offset = @bitCast(i64, this.sendfile.offset); if (Environment.isLinux) { - const sent = @truncate( - u32, - std.os.linux.sendfile(response.getNativeHandle(), this.sendfile.fd, &this.sendfile.offset, amount), - ); + const start = this.sendfile.offset; + const val = + std.os.linux.sendfile(this.sendfile.socket_fd, this.sendfile.fd, &this.sendfile.offset, this.sendfile.adjusted_count); + + const sent = @intCast(u32, this.sendfile.offset - start); + const errcode = std.os.linux.getErrno(val); this.sendfile.offset += sent; this.sendfile.remain -= sent; - if (sent == 0 or this.aborted or this.sendfile.remain == 0) { + if (errcode != .AGAIN or this.aborted or this.sendfile.remain == 0 or val == 0) { + if (errcode != .AGAIN and errcode != .SUCCESS) { + Output.prettyErrorln("Error: {s}", .{@tagName(errcode)}); + Output.flush(); + } this.cleanupAfterSendfile(); - return false; + return errcode != .SUCCESS; } } else { + // var sf_hdr_trailer: std.os.darwin.sf_hdtr = .{ + // .headers = &separator_iovec, + // .hdr_cnt = 1, + // .trailers = undefined, + // .trl_cnt = 0, + // }; + // const headers = if (this.sendfile.offset == 0) + // &sf_hdr_trailer + // else + // null; + const errcode = std.c.getErrno(std.c.sendfile( this.sendfile.fd, - response.getNativeHandle(), + this.sendfile.socket_fd, signed_offset, &sbytes, null, 0, )); + this.sendfile.offset += sbytes; - this.sendfile.remain -= if (errcode != .SUCCESS) @intCast(u32, sbytes) else 0; - if ((errcode != .AGAIN and errcode != .SUCCESS) or this.aborted or this.sendfile.remain == 0) { + this.sendfile.remain -= @intCast(u32, sbytes); + if (errcode != .AGAIN or this.aborted or this.sendfile.remain == 0 or sbytes == 0) { if (errcode != .AGAIN and errcode != .SUCCESS) { Output.prettyErrorln("Error: {s}", .{@tagName(errcode)}); Output.flush(); } this.cleanupAfterSendfile(); - return false; + return errcode != .SUCCESS; } } - this.resp.onWritable(*RequestContext, onSendfile, this); + this.resp.setWriteOffset(this.sendfile.offset); + this.resp.onWritable(*RequestContext, onWritableSendfile, this); return true; } + pub fn onWritableSendfile(this: *RequestContext, _: c_ulong, _: *App.Response) callconv(.C) bool { + return this.onSendfile(); + } + pub fn onWritablePrepareSendfile(this: *RequestContext, _: c_ulong, _: *App.Response) callconv(.C) bool { this.renderSendFile(this.blob); + return true; } @@ -349,7 +373,8 @@ pub fn NewServer(comptime ssl_enabled: bool) type { this.sendfile = .{ .fd = fd, - .remain = size_, + .remain = size_, // 2 is for \r\n, + .socket_fd = this.resp.getNativeHandle(), }; if (size_ == 0) { @@ -358,24 +383,10 @@ pub fn NewServer(comptime ssl_enabled: bool) type { return; } - { - const wrote = std.os.write( - this.resp.getNativeHandle(), - "\r\n", - ) catch { - this.cleanupAfterSendfile(); - return; - }; - if (wrote == 0) { - this.cleanupAfterSendfile(); - return; - } - } - // if we're not immediately writable, go ahead and try - if (this.sendfile.remain == size_) { - _ = this.onSendfile(size_, this.resp); - } + _ = std.os.write(this.sendfile.socket_fd, "\r\n") catch 0; + + _ = this.onSendfile(); } pub fn renderSendFile(this: *RequestContext, blob: JSC.WebCore.Blob) void { @@ -413,7 +424,6 @@ pub fn NewServer(comptime ssl_enabled: bool) type { this.blob = response.body.use(); this.req.setYield(false); this.setAbortHandler(); - this.resp.onWritable(*RequestContext, onWritablePrepareSendfile, this); if (!this.has_sendfile_ctx) this.renderSendFile(this.blob); return; } |