diff options
author | 2022-03-21 06:32:14 -0700 | |
---|---|---|
committer | 2022-03-21 06:32:14 -0700 | |
commit | 7cd93e667059d4de5250d46ec109b696cd951603 (patch) | |
tree | d39b1df18cf76fc6dac6bdc666ae904406521767 /src | |
parent | fa343fa8adb25a7e307e91a3cd3c2c3f24e0152b (diff) | |
download | bun-7cd93e667059d4de5250d46ec109b696cd951603.tar.gz bun-7cd93e667059d4de5250d46ec109b696cd951603.tar.zst bun-7cd93e667059d4de5250d46ec109b696cd951603.zip |
[bun.js] 2/? Implement `Response.file`, sendfile edition
Diffstat (limited to 'src')
-rw-r--r-- | src/deps/_libusockets.h | 8 | ||||
-rw-r--r-- | src/deps/libuwsockets.cpp | 20 | ||||
-rw-r--r-- | src/deps/uws.zig | 27 | ||||
-rw-r--r-- | src/javascript/jsc/api/html_rewriter.zig | 12 | ||||
-rw-r--r-- | src/javascript/jsc/api/server.zig | 230 | ||||
-rw-r--r-- | src/javascript/jsc/javascript.zig | 7 | ||||
-rw-r--r-- | src/javascript/jsc/webcore/response.zig | 342 |
7 files changed, 506 insertions, 140 deletions
diff --git a/src/deps/_libusockets.h b/src/deps/_libusockets.h index 7042d3d14..8c65d36ab 100644 --- a/src/deps/_libusockets.h +++ b/src/deps/_libusockets.h @@ -5,8 +5,8 @@ #include <stdbool.h> #include <stddef.h> #include <stdint.h> -#include <uws/uSockets/src/libusockets.h> - +#include <uws/src/App.h> +#include <uws/src/AsyncSocket.h> #ifdef __cplusplus extern "C" { #endif @@ -225,7 +225,7 @@ void uws_res_end(int ssl, uws_res_t *res, const char *data, size_t length, bool close_connection); void uws_res_pause(int ssl, uws_res_t *res); void uws_res_resume(int ssl, uws_res_t *res); -void uws_res_write_continue(int ssl, uws_res_t *res); +void uws_res_write_continwue(int ssl, uws_res_t *res); void uws_res_write_status(int ssl, uws_res_t *res, const char *status, size_t length); void uws_res_write_header(int ssl, uws_res_t *res, const char *key, @@ -291,6 +291,8 @@ void uws_res_write_headers(int ssl, uws_res_t *res, const StringPointer *names, const StringPointer *values, size_t count, const char *buf); +void *uws_res_get_native_handle(int ssl, uws_res_t *res); +void uws_res_uncork(int ssl, uws_res_t *res); #ifdef __cplusplus } #endif diff --git a/src/deps/libuwsockets.cpp b/src/deps/libuwsockets.cpp index 132a22be8..628d30df5 100644 --- a/src/deps/libuwsockets.cpp +++ b/src/deps/libuwsockets.cpp @@ -1001,6 +1001,16 @@ void uws_res_write_headers(int ssl, uws_res_t *res, const StringPointer *names, } } +void uws_res_uncork(int ssl, uws_res_t *res) { + // if (ssl) { + // uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res; + // uwsRes->uncork(); + // } else { + // uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res; + // uwsRes->uncork(); + // } +} + void uws_res_cork(int ssl, uws_res_t *res, void *ctx, void (*corker)(void *ctx)) { if (ssl) { @@ -1011,4 +1021,14 @@ void uws_res_cork(int ssl, uws_res_t *res, void *ctx, uwsRes->cork([ctx, corker]() { corker(ctx); }); } } + +void *uws_res_get_native_handle(int ssl, uws_res_t *res) { + if (ssl) { + uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res; + return uwsRes->getNativeHandle(); + } else { + uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res; + return uwsRes->getNativeHandle(); + } +} }
\ No newline at end of file diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 30e23b39a..c72fefd71 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -410,6 +410,12 @@ pub fn NewApp(comptime ssl: bool) type { pub fn end(res: *Response, data: []const u8, close_connection: bool) void { uws_res_end(ssl_flag, res.downcast(), data.ptr, data.len, close_connection); } + pub fn uncork(_: *Response) void { + // uws_res_uncork( + // ssl_flag, + // res.downcast(), + // ); + } pub fn pause(res: *Response) void { uws_res_pause(ssl_flag, res.downcast()); } @@ -426,7 +432,7 @@ pub fn NewApp(comptime ssl: bool) type { uws_res_write_header(ssl_flag, res.downcast(), key.ptr, key.len, value.ptr, value.len); } pub fn writeHeaderInt(res: *Response, key: []const u8, value: u64) void { - uws_res_write_header(ssl_flag, res.downcast(), key.ptr, key.len, value); + uws_res_write_header_int(ssl_flag, res.downcast(), key.ptr, key.len, value); } pub fn endWithoutBody(res: *Response) void { uws_res_end_without_body(ssl_flag, res.downcast()); @@ -440,18 +446,26 @@ pub fn NewApp(comptime ssl: bool) type { pub fn hasResponded(res: *Response) bool { return uws_res_has_responded(ssl_flag, res.downcast()); } + + pub fn getNativeHandle(res: *Response) i32 { + return @intCast(i32, @ptrToInt(uws_res_get_native_handle(ssl_flag, res.downcast()))); + } pub fn onWritable( res: *Response, comptime UserDataType: type, - comptime handler: fn (*Response, uintmax_t, UserDataType) callconv(.C) bool, + comptime handler: fn (UserDataType, uintmax_t, *Response) callconv(.C) bool, user_data: UserDataType, ) void { const Wrapper = struct { - pub fn handle(this: *uws_res, amount: uintmax_t, data: ?*anyopaque) callconv(.C) void { + pub fn handle(this: *uws_res, amount: uintmax_t, data: ?*anyopaque) callconv(.C) bool { if (comptime UserDataType == void) { - @call(.{ .modifier = .always_inline }, handler, .{ void{}, castRes(this), amount }); + return @call(.{ .modifier = .always_inline }, handler, .{ void{}, amount, castRes(this) }); } else { - @call(.{ .modifier = .always_inline }, handler, .{ @ptrCast(UserDataType, @alignCast(@alignOf(UserDataType), data.?)), castRes(this), amount }); + return @call(.{ .modifier = .always_inline }, handler, .{ + @ptrCast(UserDataType, @alignCast(@alignOf(UserDataType), data.?)), + amount, + castRes(this), + }); } } }; @@ -533,7 +547,7 @@ pub fn NewApp(comptime ssl: bool) type { }; }; } - +extern fn uws_res_get_native_handle(ssl: c_int, res: *uws_res) *us_socket_t; extern fn uws_create_app(ssl: c_int, options: us_socket_context_options_t) *uws_app_t; extern fn uws_app_destroy(ssl: c_int, app: *uws_app_t) void; extern fn uws_app_get(ssl: c_int, app: *uws_app_t, pattern: [*c]const u8, handler: uws_method_handler, user_data: ?*anyopaque) void; @@ -580,6 +594,7 @@ extern fn uws_ws_get_buffered_amount(ssl: c_int, ws: ?*uws_websocket_t) c_uint; extern fn uws_ws_get_remote_address(ssl: c_int, ws: ?*uws_websocket_t, dest: [*c][*c]const u8) usize; extern fn uws_ws_get_remote_address_as_text(ssl: c_int, ws: ?*uws_websocket_t, dest: [*c][*c]const u8) usize; const uws_res = opaque {}; +extern fn uws_res_uncork(ssl: c_int, res: *uws_res) void; extern fn uws_res_end(ssl: c_int, res: *uws_res, data: [*c]const u8, length: usize, close_connection: bool) void; extern fn uws_res_pause(ssl: c_int, res: *uws_res) void; extern fn uws_res_resume(ssl: c_int, res: *uws_res) void; diff --git a/src/javascript/jsc/api/html_rewriter.zig b/src/javascript/jsc/api/html_rewriter.zig index 3ea438556..3814c7f8d 100644 --- a/src/javascript/jsc/api/html_rewriter.zig +++ b/src/javascript/jsc/api/html_rewriter.zig @@ -304,8 +304,8 @@ pub const HTMLRewriter = struct { defer if (!is_pending) input.detach(); if (input.needsToReadFile()) { - input.doReadFileInternal(*BufferOutputSink, sink, onFinishedLoading, global); - } else if (sink.runOutputSink(input.sharedView())) |error_value| { + input.doReadFileInternal(*BufferOutputSink, sink, onFinishedLoadingWrap, global); + } else if (sink.runOutputSink(input.sharedView(), false)) |error_value| { return error_value; } @@ -316,11 +316,15 @@ pub const HTMLRewriter = struct { ); } + pub fn onFinishedLoadingWrap(sink: *anyopaque, bytes: anyerror![]u8) void { + onFinishedLoading(bun.cast(*BufferOutputSink, sink), bytes); + } + pub fn onFinishedLoading(sink: *BufferOutputSink, bytes: anyerror![]u8) void { var input = sink.input; defer input.detach(); const data = bytes catch |err| { - if (sink.response.body.value == .Locked and sink.response.body.value.Locked.task == sink) { + if (sink.response.body.value == .Locked and @ptrToInt(sink.response.body.value.Locked.task) == @ptrToInt(sink)) { sink.response.body.value = .{ .Empty = .{} }; } @@ -334,7 +338,7 @@ pub const HTMLRewriter = struct { } pub fn runOutputSink(sink: *BufferOutputSink, bytes: []const u8, is_async: bool) ?JSValue { - sink.bytes.growBy(bytes) catch unreachable; + sink.bytes.growBy(bytes.len) catch unreachable; var global = sink.global; var response = sink.response; sink.rewriter.write(bytes) catch { diff --git a/src/javascript/jsc/api/server.zig b/src/javascript/jsc/api/server.zig index 8c64c112a..7e2520b99 100644 --- a/src/javascript/jsc/api/server.zig +++ b/src/javascript/jsc/api/server.zig @@ -79,6 +79,12 @@ const IOTask = JSC.IOTask; const is_bindgen = JSC.is_bindgen; const uws = @import("uws"); +const SendfileContext = struct { + fd: i32, + remain: u32 = 0, + offset: i64 = 0, +}; + pub fn NewServer(comptime ssl_enabled: bool) type { return struct { const ThisServer = @This(); @@ -133,9 +139,17 @@ pub fn NewServer(comptime ssl_enabled: bool) type { blob: JSC.WebCore.Blob = JSC.WebCore.Blob{}, promise: ?*JSC.JSValue = null, response_headers: ?*JSC.WebCore.Headers.RefCountedHeaders = null, - + has_abort_handler: bool = false, + has_sendfile_ctx: bool = false, + sendfile: SendfileContext = undefined, pub threadlocal var pool: *RequestContextStackAllocator = undefined; + pub fn setAbortHandler(this: *RequestContext) void { + if (this.has_abort_handler) return; + this.has_abort_handler = true; + this.resp.onAborted(*RequestContext, RequestContext.onAbort, this); + } + pub fn onResolve( ctx: *RequestContext, _: *JSC.JSGlobalObject, @@ -215,30 +229,206 @@ pub fn NewServer(comptime ssl_enabled: bool) type { this.server.request_pool_allocator.destroy(this); } + fn writeHeaders( + this: *RequestContext, + headers_: *Headers.RefCountedHeaders, + ) void { + var headers: *JSC.WebCore.Headers = headers_.get(); + if (headers.getHeaderIndex("content-length")) |index| { + headers.entries.orderedRemove(index); + } + defer headers_.deref(); + var entries = headers.entries.slice(); + const names = entries.items(.name); + const values = entries.items(.value); + + this.resp.writeHeaderInt("content-length", this.blob.size); + this.resp.writeHeaders(names, values, headers.buf.items); + } + + pub fn writeStatus(this: *RequestContext, status: u16) void { + var status_text_buf: [48]u8 = undefined; + + if (status == 302) { + this.resp.writeStatus("302 Found"); + } else { + this.resp.writeStatus(std.fmt.bufPrint(&status_text_buf, "{d} HM", .{status}) catch unreachable); + } + } + + fn cleanupAfterSendfile(this: *RequestContext) void { + this.resp.endWithoutBody(); + std.os.close(this.sendfile.fd); + this.sendfile = undefined; + this.finalize(); + } + + pub fn onSendfile(this: *RequestContext, amount_: c_ulong, response: *App.Response) callconv(.C) bool { + const amount = @minimum(@truncate(u32, amount_), this.sendfile.remain); + + if (amount == 0 or this.aborted) { + this.cleanupAfterSendfile(); + return true; + } + + const adjusted_count_temporary = @minimum(amount, @as(u63, std.math.maxInt(i32))); + // TODO we should not need this int cast; improve the return type of `@minimum` + const adjusted_count = @intCast(u63, adjusted_count_temporary); + var sbytes: std.os.off_t = adjusted_count; + const signed_offset = @bitCast(i64, this.sendfile.offset); + + if (Environment.isLinux) { + const sent = @truncate( + u32, + std.os.linux.sendfile(response.getNativeHandle(), this.sendfile.fd, &this.sendfile.offset, amount), + ); + + this.sendfile.offset += sent; + this.sendfile.remain -= sent; + + if (sent == 0 or this.aborted or this.sendfile.remain == 0) { + this.cleanupAfterSendfile(); + return false; + } + } else { + const errcode = std.c.getErrno(std.c.sendfile( + this.sendfile.fd, + response.getNativeHandle(), + + signed_offset, + &sbytes, + null, + 0, + )); + this.sendfile.offset += sbytes; + this.sendfile.remain -= if (errcode != .SUCCESS) @intCast(u32, sbytes) else 0; + if ((errcode != .AGAIN and errcode != .SUCCESS) or this.aborted or this.sendfile.remain == 0) { + if (errcode != .AGAIN and errcode != .SUCCESS) { + Output.prettyErrorln("Error: {s}", .{@tagName(errcode)}); + Output.flush(); + } + this.cleanupAfterSendfile(); + return false; + } + } + + this.resp.onWritable(*RequestContext, onSendfile, this); + return true; + } + + pub fn onWritablePrepareSendfile(this: *RequestContext, _: c_ulong, _: *App.Response) callconv(.C) bool { + this.renderSendFile(this.blob); + return true; + } + + pub fn onPrepareSendfileWrap(this: *anyopaque, fd: i32, size: anyerror!u32, _: *JSGlobalObject) void { + onPrepareSendfile(bun.cast(*RequestContext, this), fd, size); + } + + fn onPrepareSendfile(this: *RequestContext, fd: i32, size: anyerror!u32) void { + this.setAbortHandler(); + if (this.aborted) return; + const size_ = size catch { + this.req.setYield(true); + this.finalize(); + return; + }; + this.blob.size = size_; + const code = this.response_ptr.?.statusCode(); + if (size_ == 0 and code >= 200 and code < 300) { + this.writeStatus(204); + } else { + this.writeStatus(code); + } + + if (this.response_ptr.?.body.init.headers) |headers_| { + this.writeHeaders(headers_); + } else { + this.resp.writeHeaderInt("content-length", size_); + } + + this.sendfile = .{ + .fd = fd, + .remain = size_, + }; + + if (size_ == 0) { + this.cleanupAfterSendfile(); + this.finalize(); + + return; + } + { + const wrote = std.os.write( + this.resp.getNativeHandle(), + "\r\n", + ) catch { + this.cleanupAfterSendfile(); + return; + }; + if (wrote == 0) { + this.cleanupAfterSendfile(); + return; + } + } + + // if we're not immediately writable, go ahead and try + if (this.sendfile.remain == size_) { + _ = this.onSendfile(size_, this.resp); + } + } + + pub fn renderSendFile(this: *RequestContext, blob: JSC.WebCore.Blob) void { + if (this.has_sendfile_ctx) return; + this.has_sendfile_ctx = true; + + JSC.WebCore.Blob.doOpenAndStatFile( + &this.blob, + *RequestContext, + this, + onPrepareSendfileWrap, + blob.globalThis, + ); + } + pub fn doRender(this: *RequestContext) void { if (this.aborted) { return; } var response = this.response_ptr.?; - this.blob = response.body.use(); - const status = response.statusCode(); + var body = &response.body; - if (response.body.init.headers) |headers_| { - var headers: *JSC.WebCore.Headers = headers_.get(); - defer headers_.deref(); - var entries = headers.entries.slice(); - const names = entries.items(.name); - const values = entries.items(.value); - - var status_text_buf: [48]u8 = undefined; - - if (status == 302) { - this.resp.writeStatus("302 Found"); - } else { - this.resp.writeStatus(std.fmt.bufPrint(&status_text_buf, "{d} HM", .{response.body.init.status_code}) catch unreachable); + if (body.value == .Error) { + this.resp.writeStatus("500 Internal Server Error"); + this.resp.writeHeader("content-type", "text/plain"); + this.resp.endWithoutBody(); + JSC.VirtualMachine.vm.defaultErrorHandler(body.value.Error, null); + body.value = JSC.WebCore.Body.Value.empty; + this.finalize(); + return; + } + + if (body.value == .Blob) { + if (body.value.Blob.needsToReadFile()) { + this.blob = response.body.use(); + this.req.setYield(false); + this.setAbortHandler(); + this.resp.onWritable(*RequestContext, onWritablePrepareSendfile, this); + if (!this.has_sendfile_ctx) this.renderSendFile(this.blob); + return; } + } + + this.renderBytes(response); + } - this.resp.writeHeaders(names, values, headers.buf.items); + pub fn renderBytes(this: *RequestContext, response: *JSC.WebCore.Response) void { + const status = response.statusCode(); + + this.writeStatus(status); + + if (response.body.init.headers) |headers_| { + this.writeHeaders(headers_); } if (status == 302 or status == 202 or this.blob.size == 0) { @@ -253,8 +443,8 @@ pub fn NewServer(comptime ssl_enabled: bool) type { pub fn render(this: *RequestContext, response: *JSC.WebCore.Response) void { this.response_ptr = response; - this.resp.runCorked(*RequestContext, doRender, this); - this.response_ptr = null; + // this.resp.runCorked(*RequestContext, doRender, this); + this.doRender(); } }; @@ -292,7 +482,7 @@ pub fn NewServer(comptime ssl_enabled: bool) type { } if (ctx.response_jsvalue.jsTypeLoose() == .JSPromise) { - resp.onAborted(*RequestContext, RequestContext.onAbort, ctx); + ctx.setAbortHandler(); JSC.VirtualMachine.vm.tick(); ctx.response_jsvalue.then( diff --git a/src/javascript/jsc/javascript.zig b/src/javascript/jsc/javascript.zig index 5c4c1e5ca..d2ba2490a 100644 --- a/src/javascript/jsc/javascript.zig +++ b/src/javascript/jsc/javascript.zig @@ -298,6 +298,7 @@ pub fn IOTask(comptime Context: type) type { const AsyncTransformTask = @import("./api/transpiler.zig").TransformTask.AsyncTransformTask; const BunTimerTimeoutTask = Bun.Timer.Timeout.TimeoutTask; const ReadFileTask = WebCore.Blob.Store.ReadFile.ReadFileTask; +const OpenAndStatFileTask = WebCore.Blob.Store.OpenAndStatFile.OpenAndStatFileTask; // const PromiseTask = JSInternalPromise.Completion.PromiseTask; pub const Task = TaggedPointerUnion(.{ FetchTasklet, @@ -305,6 +306,7 @@ pub const Task = TaggedPointerUnion(.{ AsyncTransformTask, BunTimerTimeoutTask, ReadFileTask, + OpenAndStatFileTask, // PromiseTask, // TimeoutTasklet, }); @@ -533,6 +535,11 @@ pub const VirtualMachine = struct { transform_task.*.runFromJS(); finished += 1; }, + @field(Task.Tag, @typeName(OpenAndStatFileTask)) => { + var transform_task: *OpenAndStatFileTask = task.get(OpenAndStatFileTask).?; + transform_task.*.runFromJS(); + finished += 1; + }, else => unreachable, } } diff --git a/src/javascript/jsc/webcore/response.zig b/src/javascript/jsc/webcore/response.zig index bdecc6df4..dcd8a06d6 100644 --- a/src/javascript/jsc/webcore/response.zig +++ b/src/javascript/jsc/webcore/response.zig @@ -390,7 +390,7 @@ pub const Response = struct { return default.value; }, - .Used, .Locked, .Empty => return default.value, + .Used, .Locked, .Empty, .Error => return default.value, } } @@ -1806,6 +1806,211 @@ pub const Blob = struct { return try Blob.Store.init(list.items, allocator); } + const AsyncIO = HTTPClient.NetworkThread.AsyncIO; + + pub fn FileOpenerMixin(comptime This: type) type { + return struct { + const open_flags = std.os.O.RDONLY | std.os.O.NONBLOCK | std.os.O.CLOEXEC; + + pub fn getFdMac(this: *This) AsyncIO.OpenError!JSC.Node.FileDescriptor { + var buf: [bun.MAX_PATH_BYTES]u8 = undefined; + this.opened_fd = AsyncIO.openSync( + this.file_store.pathlike.path.sliceZ(&buf), + open_flags, + ) catch |err| { + this.errno = err; + return err; + }; + return this.opened_fd; + } + + pub fn getFd(this: *This) AsyncIO.OpenError!JSC.Node.FileDescriptor { + if (this.opened_fd != 0) { + return this.opened_fd; + } + + if (comptime Environment.isMac) { + return try this.getFdMac(); + } else { + return try this.getFdLinux(); + } + } + + pub fn getFdLinux(this: *This) AsyncIO.OpenError!JSC.Node.FileDescriptor { + var aio = &AsyncIO.global; + + var buf: [bun.MAX_PATH_BYTES]u8 = undefined; + aio.open( + *This, + this, + onOpen, + &this.open_completion, + this.file_store.pathlike.path.sliceZ(&buf), + open_flags, + 0, + ); + + suspend { + this.open_frame = @frame().*; + } + + if (this.errno) |errno| { + return @errSetCast(AsyncIO.OpenError, errno); + } + + return this.opened_fd; + } + + pub fn onOpen(this: *This, _: *HTTPClient.NetworkThread.Completion, result: AsyncIO.OpenError!JSC.Node.FileDescriptor) void { + this.opened_fd = result catch |err| { + this.errno = err; + if (comptime Environment.isLinux) resume this.open_frame; + return; + }; + + if (comptime Environment.isLinux) resume this.open_frame; + } + }; + } + + pub fn FileCloserMixin(comptime This: type) type { + return struct { + pub fn doClose(this: *This) AsyncIO.CloseError!void { + var aio = &AsyncIO.global; + + aio.close( + *This, + this, + onClose, + &this.close_completion, + this.opened_fd, + ); + this.opened_fd = 0; + + suspend { + this.close_frame = @frame().*; + } + if (@hasField(This, "errno")) { + if (this.errno) |errno| { + return @errSetCast(AsyncIO.CloseError, errno); + } + } + } + + pub fn onClose(this: *This, _: *HTTPClient.NetworkThread.Completion, result: AsyncIO.CloseError!void) void { + result catch |err| { + if (@hasField(This, "errno")) { + this.errno = err; + } + resume this.close_frame; + return; + }; + + resume this.close_frame; + } + }; + } + + pub const OpenAndStatFile = struct { + const OpenFrameType = if (Environment.isMac) + void + else + @Frame(OpenAndStatFile.getFdLinux); + + open_frame: OpenFrameType = undefined, + errno: ?anyerror = null, + open_completion: HTTPClient.NetworkThread.Completion = undefined, + opened_fd: JSC.Node.FileDescriptor = undefined, + size: u32 = 0, + + store: *Store = undefined, + file_store: FileStore, + + onCompleteCtx: *anyopaque = undefined, + onCompleteCallback: OnCompleteCallback = undefined, + runAsyncFrame: @Frame(OpenAndStatFile.runAsync) = undefined, + + task: HTTPClient.NetworkThread.Task = undefined, + + pub const OnCompleteCallback = fn ( + ctx: *anyopaque, + fd: JSC.Node.FileDescriptor, + size: anyerror!u32, + global: *JSGlobalObject, + ) void; + + pub usingnamespace FileOpenerMixin(OpenAndStatFile); + pub usingnamespace FileCloserMixin(OpenAndStatFile); + + pub fn createWithCtx( + allocator: std.mem.Allocator, + store: *Store, + ctx: *anyopaque, + onCompleteCallback: OnCompleteCallback, + ) !*OpenAndStatFile { + var read_file = try allocator.create(OpenAndStatFile); + read_file.* = OpenAndStatFile{ + .file_store = store.data.file, + + .store = store, + .onCompleteCtx = ctx, + .onCompleteCallback = onCompleteCallback, + }; + store.ref(); + return read_file; + } + + pub const OpenAndStatFileTask = JSC.IOTask(@This()); + + pub fn run(this: *OpenAndStatFile, task: *OpenAndStatFileTask) void { + this.runAsyncFrame = async this.runAsync(task); + } + + pub fn then(this: *OpenAndStatFile, globalThis: *JSC.JSGlobalObject) void { + var cb = this.onCompleteCallback; + var cb_ctx = this.onCompleteCtx; + const fd = this.opened_fd; + const _size = this.size; + const errno = this.errno; + this.store.deref(); + + bun.default_allocator.destroy(this); + if (errno) |err| { + cb(cb_ctx, fd, err, globalThis); + } else { + cb(cb_ctx, fd, _size, globalThis); + } + } + + pub fn runAsync(this: *OpenAndStatFile, task: *OpenAndStatFileTask) void { + defer task.onFinish(); + this.opened_fd = 0; + if (this.file_store.pathlike == .fd) { + this.opened_fd = this.file_store.pathlike.fd; + } + const fd = + if (this.opened_fd == 0) + this.getFd() catch return + else + this.opened_fd; + + const stat: std.os.Stat = switch (JSC.Node.Syscall.fstat(fd)) { + .result => |result| result, + .err => |err| { + this.errno = AsyncIO.asError(err.errno); + return; + }, + }; + + if (!std.os.S.ISREG(stat.mode)) { + this.errno = error.ENOTSUP; + return; + } + + this.size = @truncate(u32, @intCast(u64, @maximum(@intCast(i64, stat.size), 0))); + } + }; + pub const ReadFile = struct { const OpenFrameType = if (Environment.isMac) void @@ -1831,18 +2036,19 @@ pub const Blob = struct { close_completion: HTTPClient.NetworkThread.Completion = undefined, task: HTTPClient.NetworkThread.Task = undefined, - onReadFileCompleteCtx: *anyopaque = undefined, - onReadFileComplete: OnReadFileCallback = undefined, + onCompleteCtx: *anyopaque = undefined, + onCompleteCallback: OnReadFileCallback = undefined, pub const OnReadFileCallback = fn (ctx: *anyopaque, bytes: anyerror![]u8) void; - const AsyncIO = HTTPClient.NetworkThread.AsyncIO; + pub usingnamespace FileOpenerMixin(ReadFile); + pub usingnamespace FileCloserMixin(ReadFile); pub fn createWithCtx( allocator: std.mem.Allocator, store: *Store, onReadFileContext: *anyopaque, - onReadFileComplete: OnReadFileCallback, + onCompleteCallback: OnReadFileCallback, off: u32, max_len: u32, ) !*ReadFile { @@ -1852,8 +2058,8 @@ pub const Blob = struct { .offset = off, .max_length = max_len, .store = store, - .onReadFileCompleteCtx = onReadFileContext, - .onReadFileComplete = onReadFileComplete, + .onCompleteCtx = onReadFileContext, + .onCompleteCallback = onCompleteCallback, }; store.ref(); return read_file; @@ -1877,54 +2083,6 @@ pub const Blob = struct { return try ReadFile.createWithCtx(allocator, store, @ptrCast(*anyopaque, context), Handler.run, off, max_len); } - pub fn getFdMac(this: *ReadFile) AsyncIO.OpenError!JSC.Node.FileDescriptor { - var buf: [bun.MAX_PATH_BYTES]u8 = undefined; - this.opened_fd = AsyncIO.openSync( - this.file_store.pathlike.path.sliceZ(&buf), - std.os.O.RDONLY, - ) catch |err| { - this.errno = err; - return err; - }; - return this.opened_fd; - } - - pub fn getFd(this: *ReadFile) AsyncIO.OpenError!JSC.Node.FileDescriptor { - if (this.file_store.pathlike == .fd) { - return this.file_store.pathlike.fd; - } - - if (comptime Environment.isMac) { - return try this.getFdMac(); - } else { - return try this.getFdLinux(); - } - } - - pub fn getFdLinux(this: *ReadFile) AsyncIO.OpenError!JSC.Node.FileDescriptor { - var aio = &AsyncIO.global; - - aio.open( - *ReadFile, - this, - onOpen, - &this.open_completion, - this.file_store.pathlike.path.sliceZ(), - std.os.O.RDONLY, - 0, - ); - - suspend { - this.open_frame = @frame().*; - } - - if (this.errno) |errno| { - return @errSetCast(AsyncIO.OpenError, errno); - } - - return this.opened_fd; - } - pub fn doRead(this: *ReadFile) AsyncIO.ReadError!u32 { var aio = &AsyncIO.global; @@ -1951,32 +2109,11 @@ pub const Blob = struct { return this.read_len; } - pub fn doClose(this: *ReadFile) AsyncIO.CloseError!void { - var aio = &AsyncIO.global; - - aio.close( - *ReadFile, - this, - onClose, - &this.close_completion, - this.opened_fd, - ); - this.opened_fd = 0; - - suspend { - this.close_frame = @frame().*; - } - - if (this.errno) |errno| { - return @errSetCast(AsyncIO.CloseError, errno); - } - } - pub const ReadFileTask = JSC.IOTask(@This()); pub fn then(this: *ReadFile, _: *JSC.JSGlobalObject) void { - var cb = this.onReadFileComplete; - var cb_ctx = this.onReadFileCompleteCtx; + var cb = this.onCompleteCallback; + var cb_ctx = this.onCompleteCtx; var store = this.store orelse { var _err = this.errno orelse error.MissingData; @@ -2017,16 +2154,6 @@ pub const Blob = struct { this.runAsyncFrame = async this.runAsync(task); } - pub fn onOpen(this: *ReadFile, _: *HTTPClient.NetworkThread.Completion, result: AsyncIO.OpenError!JSC.Node.FileDescriptor) void { - this.opened_fd = result catch |err| { - this.errno = err; - if (comptime Environment.isLinux) resume this.open_frame; - return; - }; - - if (comptime Environment.isLinux) resume this.open_frame; - } - pub fn onRead(this: *ReadFile, _: *HTTPClient.NetworkThread.Completion, result: AsyncIO.ReadError!usize) void { this.read_len = @truncate(u32, result catch |err| { this.errno = err; @@ -2038,18 +2165,11 @@ pub const Blob = struct { resume this.read_frame; } - pub fn onClose(this: *ReadFile, _: *HTTPClient.NetworkThread.Completion, result: AsyncIO.CloseError!void) void { - result catch |err| { - this.errno = err; - resume this.close_frame; - return; - }; - - resume this.close_frame; - } - pub fn runAsync(this: *ReadFile, task: *ReadFileTask) void { defer task.onFinish(); + if (this.file_store.pathlike == .fd) { + this.opened_fd = this.file_store.pathlike.fd; + } const fd = this.getFd() catch return; const needs_close = this.file_store.pathlike == .path; @@ -2591,10 +2711,8 @@ pub const Blob = struct { pub fn NewInternalReadFileHandler(comptime Context: type, comptime Function: anytype) type { return struct { - context: Context, - pub fn run(handler: *anyopaque, bytes_: anyerror![]u8) void { - Function(bun.cast(Context, handler.context), bytes_); + Function(bun.cast(Context, handler), bytes_); } }; } @@ -2603,13 +2721,23 @@ pub const Blob = struct { var file_read = Store.ReadFile.createWithCtx( bun.default_allocator, this.store.?, + ctx, + Function, this.offset, this.size, - Handler, + ) catch unreachable; + var read_file_task = Store.ReadFile.ReadFileTask.createOnJSThread(bun.default_allocator, global, file_read) catch unreachable; + read_file_task.schedule(); + } + + pub fn doOpenAndStatFile(this: *Blob, comptime Handler: type, ctx: Handler, comptime Function: anytype, global: *JSGlobalObject) void { + var file_read = Store.OpenAndStatFile.createWithCtx( + bun.default_allocator, + this.store.?, ctx, Function, ) catch unreachable; - var read_file_task = Store.ReadFile.ReadFileTask.createOnJSThread(bun.default_allocator, global, file_read) catch unreachable; + var read_file_task = Store.OpenAndStatFile.OpenAndStatFileTask.createOnJSThread(bun.default_allocator, global, file_read) catch unreachable; read_file_task.schedule(); } @@ -3208,7 +3336,7 @@ pub const Body = struct { } pub fn toErrorInstance(this: *Value, error_instance: JSC.JSValue, global: *JSGlobalObject) void { - if (this.value == .Locked) { + if (this.* == .Locked) { var locked = this.Locked; locked.deinit = true; if (locked.promise) |promise| { @@ -3236,7 +3364,7 @@ pub const Body = struct { bun.default_allocator, "Error reading file {s}", .{@errorName(err)}, - )); + ) catch unreachable); error_str.mark(); var error_instance = error_str.toErrorInstance(global); return this.toErrorInstance(error_instance, global); @@ -3255,7 +3383,7 @@ pub const Body = struct { } if (tag == .Error) { - JSC.C.JSValueUnprotect(VirtualMachine.vm.global.vm(), this.Error.asObjectRef()); + JSC.C.JSValueUnprotect(VirtualMachine.vm.global.ref(), this.Error.asObjectRef()); } } @@ -3383,7 +3511,7 @@ pub const Request = struct { return MimeType.other.value; }, - .Used, .Locked, .Empty => return MimeType.other.value, + .Error, .Used, .Locked, .Empty => return MimeType.other.value, } } |