diff options
Diffstat (limited to 'src/bun.js/api')
-rw-r--r-- | src/bun.js/api/bun/socket.zig | 4 | ||||
-rw-r--r-- | src/bun.js/api/server.zig | 286 |
2 files changed, 122 insertions, 168 deletions
diff --git a/src/bun.js/api/bun/socket.zig b/src/bun.js/api/bun/socket.zig index 5980de6ec..8b78cf035 100644 --- a/src/bun.js/api/bun/socket.zig +++ b/src/bun.js/api/bun/socket.zig @@ -1476,8 +1476,8 @@ fn NewSocket(comptime ssl: bool) type { globalObject.throw("Only Blob/buffered bodies are supported for now", .{}); return .{ .fail = {} }; } else if (args.ptr[0].as(JSC.WebCore.Request)) |request| { - request.body.toBlobIfPossible(); - if (request.body.tryUseAsAnyBlob()) |blob| { + request.body.value.toBlobIfPossible(); + if (request.body.value.tryUseAsAnyBlob()) |blob| { break :getter blob; } diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 4662ad0bc..f8be42e79 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -941,7 +941,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp needs_content_length: bool = false, needs_content_range: bool = false, sendfile: SendfileContext = undefined, - request_js_object: JSC.C.JSObjectRef = null, + request_body: ?*JSC.WebCore.BodyValueRef = null, request_body_buf: std.ArrayListUnmanaged(u8) = .{}, request_body_content_len: usize = 0, @@ -983,17 +983,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp const result = arguments.ptr[0]; result.ensureStillAlive(); - if (ctx.request_js_object != null and ctx.signal == null) { - var request_js = ctx.request_js_object.?.value(); - request_js.ensureStillAlive(); - if (request_js.as(Request)) |request_object| { - if (request_object.signal) |signal| { - ctx.signal = signal; - _ = signal.ref(); - } - } - } - ctx.pending_promises_for_abort -|= 1; if (ctx.aborted) { ctx.finalizeForAbort(); @@ -1042,17 +1031,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp var ctx = arguments.ptr[1].asPromisePtr(@This()); const err = arguments.ptr[0]; - if (ctx.request_js_object != null and ctx.signal == null) { - var request_js = ctx.request_js_object.?.value(); - request_js.ensureStillAlive(); - if (request_js.as(Request)) |request_object| { - if (request_object.signal) |signal| { - ctx.signal = signal; - _ = signal.ref(); - } - } - } - ctx.pending_promises_for_abort -|= 1; if (ctx.aborted) { @@ -1273,11 +1251,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp return false; } - if (this.request_js_object) |obj| { - if (obj.value().as(Request)) |req| { - if (req.body == .Locked) { - return false; - } + if (this.request_body) |body| { + if (body.value == .Locked) { + return false; } } @@ -1317,27 +1293,20 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // if we cannot, we have to reject pending promises // first, we reject the request body promise - if (this.request_js_object != null) { - var request_js = this.request_js_object.?.value(); - request_js.ensureStillAlive(); - - this.request_js_object = null; - defer request_js.ensureStillAlive(); - defer JSC.C.JSValueUnprotect(this.server.globalThis, request_js.asObjectRef()); + if (this.request_body) |body| { // User called .blob(), .json(), text(), or .arrayBuffer() on the Request object // but we received nothing or the connection was aborted - if (request_js.as(Request)) |req| { + if (body.value == .Locked) { // the promise is pending - if (req.body == .Locked and (req.body.Locked.action != .none or req.body.Locked.promise != null)) { + if (body.value.Locked.action != .none or body.value.Locked.promise != null) { this.pending_promises_for_abort += 1; - req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); - } else if (req.body == .Locked and (req.body.Locked.readable != null)) { - req.body.Locked.readable.?.abort(this.server.globalThis); - req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); - req.body.Locked.readable = null; + body.value.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); + } else if (body.value.Locked.readable != null) { + body.value.Locked.readable.?.abort(this.server.globalThis); + body.value.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); + body.value.Locked.readable = null; } - req.uws_request = null; } } @@ -1399,23 +1368,13 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp _ = signal.unref(); } - if (this.request_js_object != null) { - ctxLog("finalizeWithoutDeinit: request_js_object != null", .{}); - - var request_js = this.request_js_object.?.value(); - request_js.ensureStillAlive(); - - this.request_js_object = null; - defer request_js.ensureStillAlive(); - defer JSC.C.JSValueUnprotect(this.server.globalThis, request_js.asObjectRef()); + if (this.request_body) |body| { + ctxLog("finalizeWithoutDeinit: request_body != null", .{}); // User called .blob(), .json(), text(), or .arrayBuffer() on the Request object // but we received nothing or the connection was aborted - if (request_js.as(Request)) |req| { - // the promise is pending - if (req.body == .Locked and req.body.Locked.action != .none and req.body.Locked.promise != null) { - req.body.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); - } - req.uws_request = null; + // the promise is pending + if (body.value == .Locked and body.value.Locked.action != .none and body.value.Locked.promise != null) { + body.value.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis); } } @@ -1461,6 +1420,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.request_body_buf.clearAndFree(this.allocator); this.response_buf_owned.clearAndFree(this.allocator); + if (this.request_body) |body| { + _ = body.unref(); + this.request_body = null; + } server.request_pool_allocator.destroy(this); } @@ -2037,12 +2000,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp switch (promise.status(vm.global.vm())) { .Pending => {}, .Fulfilled => { - if (ctx.signal == null) { - if (request_object.signal) |signal| { - ctx.signal = signal; - _ = signal.ref(); - } - } const fulfilled_value = promise.result(vm.global.vm()); // if you return a Response object or a Promise<Response> @@ -2085,12 +2042,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp return; }, .Rejected => { - if (ctx.signal == null) { - if (request_object.signal) |signal| { - ctx.signal = signal; - _ = signal.ref(); - } - } ctx.handleReject(promise.result(vm.global.vm())); return; }, @@ -2661,84 +2612,80 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (this.aborted) return; - const request = JSC.JSValue.fromRef(this.request_js_object); - var req = request.as(Request) orelse { - this.request_body_buf.clearAndFree(this.allocator); - return; - }; + if (this.request_body != null) { + var body = this.request_body.?; - if (req.body == .Locked) { - if (req.body.Locked.readable) |readable| { - if (readable.ptr == .Bytes) { - std.debug.assert(this.request_body_buf.items.len == 0); + if (body.value == .Locked) { + if (body.value.Locked.readable) |readable| { + if (readable.ptr == .Bytes) { + std.debug.assert(this.request_body_buf.items.len == 0); - if (!last) { - readable.ptr.Bytes.onData( - .{ - .temporary = bun.ByteList.initConst(chunk), - }, - bun.default_allocator, - ); - } else { - readable.ptr.Bytes.onData( - .{ - .temporary_and_done = bun.ByteList.initConst(chunk), - }, - bun.default_allocator, - ); - } + if (!last) { + readable.ptr.Bytes.onData( + .{ + .temporary = bun.ByteList.initConst(chunk), + }, + bun.default_allocator, + ); + } else { + readable.ptr.Bytes.onData( + .{ + .temporary_and_done = bun.ByteList.initConst(chunk), + }, + bun.default_allocator, + ); + } - return; + return; + } } } - } - if (last) { - request.ensureStillAlive(); - var bytes = this.request_body_buf; - defer this.request_body_buf = .{}; - var old = req.body; + if (last) { + var bytes = this.request_body_buf; + defer this.request_body_buf = .{}; + var old = body.value; + + const total = bytes.items.len + chunk.len; + getter: { + // if (total <= JSC.WebCore.InlineBlob.available_bytes) { + // if (total == 0) { + // body.value = .{ .Empty = {} }; + // break :getter; + // } + + // body.value = .{ .InlineBlob = JSC.WebCore.InlineBlob.concat(bytes.items, chunk) }; + // this.request_body_buf.clearAndFree(this.allocator); + // } else { + bytes.ensureTotalCapacityPrecise(this.allocator, total) catch |err| { + this.request_body_buf.clearAndFree(this.allocator); + body.value.toError(err, this.server.globalThis); + break :getter; + }; - const total = bytes.items.len + chunk.len; - getter: { - // if (total <= JSC.WebCore.InlineBlob.available_bytes) { - // if (total == 0) { - // req.body = .{ .Empty = {} }; - // break :getter; - // } - - // req.body = .{ .InlineBlob = JSC.WebCore.InlineBlob.concat(bytes.items, chunk) }; - // this.request_body_buf.clearAndFree(this.allocator); - // } else { - bytes.ensureTotalCapacityPrecise(this.allocator, total) catch |err| { - this.request_body_buf.clearAndFree(this.allocator); - req.body.toError(err, this.server.globalThis); - break :getter; - }; + const prev_len = bytes.items.len; + bytes.items.len = total; + var slice = bytes.items[prev_len..]; + @memcpy(slice.ptr, chunk.ptr, chunk.len); + body.value = .{ + .InternalBlob = .{ + .bytes = bytes.toManaged(this.allocator), + }, + }; + // } + } - const prev_len = bytes.items.len; - bytes.items.len = total; - var slice = bytes.items[prev_len..]; - @memcpy(slice.ptr, chunk.ptr, chunk.len); - req.body = .{ - .InternalBlob = .{ - .bytes = bytes.toManaged(this.allocator), - }, - }; - // } + if (old == .Locked) { + old.resolve(&body.value, this.server.globalThis); + } + return; } - if (old == .Locked) { - old.resolve(&req.body, this.server.globalThis); + if (this.request_body_buf.capacity == 0) { + this.request_body_buf.ensureTotalCapacityPrecise(this.allocator, @min(this.request_body_content_len, max_request_body_preallocate_length)) catch @panic("Out of memory while allocating request body buffer"); } - request.unprotect(); - return; + this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body"); } - - if (this.request_body_buf.capacity == 0) { - this.request_body_buf.ensureTotalCapacityPrecise(this.allocator, @min(this.request_body_content_len, max_request_body_preallocate_length)) catch @panic("Out of memory while allocating request body buffer"); - } - this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body"); } pub fn onStartStreamingRequestBody(this: *RequestContext) JSC.WebCore.DrainResult { @@ -2771,22 +2718,20 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp const max_request_body_preallocate_length = 1024 * 256; pub fn onStartBuffering(this: *RequestContext) void { ctxLog("onStartBuffering", .{}); - const request = JSC.JSValue.c(this.request_js_object); - request.ensureStillAlive(); // TODO: check if is someone calling onStartBuffering other than onStartBufferingCallback // if is not, this should be removed and only keep protect + setAbortHandler if (this.is_transfer_encoding == false and this.request_body_content_len == 0) { // no content-length or 0 content-length // no transfer-encoding - if (request.as(Request)) |req| { - var old = req.body; + if (this.request_body != null) { + var body = this.request_body.?; + var old = body.value; old.Locked.onReceiveValue = null; - req.body = .{ .Null = {} }; - old.resolve(&req.body, this.server.globalThis); + var new_body = .{ .Null = {} }; + old.resolve(&new_body, this.server.globalThis); + body.value = new_body; } } else { - // we need to buffer the request body - request.protect(); this.setAbortHandler(); } } @@ -4201,6 +4146,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { config: ServerConfig = ServerConfig{}, pending_requests: usize = 0, request_pool_allocator: std.mem.Allocator = undefined, + listen_callback: JSC.AnyTask = undefined, allocator: std.mem.Allocator, poll_ref: JSC.PollRef = .{}, @@ -4596,7 +4542,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { existing_request = Request{ .url = url.href, .headers = headers, - .body = body, + .body = JSC.WebCore.InitRequestBodyValue(body) catch unreachable, .method = method, }; } @@ -4948,15 +4894,22 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { var ctx = this.request_pool_allocator.create(RequestContext) catch @panic("ran out of memory"); ctx.create(this, req, resp); var request_object = this.allocator.create(JSC.WebCore.Request) catch unreachable; + var body = JSC.WebCore.InitRequestBodyValue(.{ .Null = {} }) catch unreachable; + + ctx.request_body = body; + const js_signal = JSC.WebCore.AbortSignal.create(this.globalThis); + js_signal.ensureStillAlive(); + if (JSC.WebCore.AbortSignal.fromJS(js_signal)) |signal| { + ctx.signal = signal.ref().ref(); // +2 refs 1 for the request and 1 for the request context + } request_object.* = .{ .url = "", .method = ctx.method, .uws_request = req, .https = ssl_enabled, - .body = .{ - .Null = {}, - }, + .signal = ctx.signal, + .body = body.ref(), }; if (comptime debug_mode) { @@ -4996,7 +4949,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { // we defer pre-allocating the body until we receive the first chunk // that way if the client is lying about how big the body is or the client aborts // we don't waste memory - request_object.body = .{ + ctx.request_body.?.value = .{ .Locked = .{ .task = ctx, .global = this.globalThis, @@ -5005,14 +4958,6 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { }, }; resp.onData(*RequestContext, RequestContext.onBufferedBodyChunk, ctx); - } else if (request_object.body == .Locked) { - // This branch should never be taken, but we are handling it anyway. - var old = request_object.body; - old.Locked.onReceiveValue = null; - request_object.body = .{ .Null = {} }; - old.resolve(&request_object.body, this.globalThis); - } else { - request_object.body = .{ .Null = {} }; } } @@ -5021,15 +4966,12 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { request_object.toJS(this.globalThis), this.thisObject, }; - ctx.request_js_object = args[0].asObjectRef(); + const request_value = args[0]; request_value.ensureStillAlive(); const response_value = this.config.onRequest.callWithThis(this.globalThis, this.thisObject, &args); - if (request_object.signal) |signal| { - ctx.signal = signal; - _ = signal.ref(); - } + ctx.onResponse( this, req, @@ -5037,6 +4979,8 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { request_value, response_value, ); + // uWS request will not live longer than this function + request_object.uws_request = null; } pub fn onWebSocketUpgrade( @@ -5052,15 +4996,23 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { var ctx = this.request_pool_allocator.create(RequestContext) catch @panic("ran out of memory"); ctx.create(this, req, resp); var request_object = this.allocator.create(JSC.WebCore.Request) catch unreachable; + var body = JSC.WebCore.InitRequestBodyValue(.{ .Null = {} }) catch unreachable; + + ctx.request_body = body; + const js_signal = JSC.WebCore.AbortSignal.create(this.globalThis); + js_signal.ensureStillAlive(); + if (JSC.WebCore.AbortSignal.fromJS(js_signal)) |signal| { + ctx.signal = signal.ref().ref(); // +2 refs 1 for the request and 1 for the request context + } + request_object.* = .{ .url = "", .method = ctx.method, .uws_request = req, .upgrader = ctx, .https = ssl_enabled, - .body = .{ - .Null = {}, - }, + .signal = ctx.signal, + .body = body.ref(), }; ctx.upgrade_context = upgrade_ctx; @@ -5069,7 +5021,6 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { request_object.toJS(this.globalThis), this.thisObject, }; - ctx.request_js_object = args[0].asObjectRef(); const request_value = args[0]; request_value.ensureStillAlive(); const response_value = this.config.onRequest.callWithThis(this.globalThis, this.thisObject, &args); @@ -5081,6 +5032,9 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { request_value, response_value, ); + + // uWS request will not live longer than this function + request_object.uws_request = null; } pub fn listen(this: *ThisServer) void { |