diff options
Diffstat (limited to 'src/bun.js')
m--------- | src/bun.js/WebKit | 0 | ||||
-rw-r--r-- | src/bun.js/api/server.zig | 295 | ||||
-rw-r--r-- | src/bun.js/bindings/AsyncContextFrame.cpp | 8 | ||||
-rw-r--r-- | src/bun.js/bindings/ZigGlobalObject.cpp | 14 | ||||
-rw-r--r-- | src/bun.js/event_loop.zig | 49 | ||||
-rw-r--r-- | src/bun.js/javascript.zig | 4 | ||||
-rw-r--r-- | src/bun.js/webcore/body.zig | 19 | ||||
-rw-r--r-- | src/bun.js/webcore/request.zig | 20 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 108 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 125 |
10 files changed, 478 insertions, 164 deletions
diff --git a/src/bun.js/WebKit b/src/bun.js/WebKit -Subproject fd79ce3120a692f4aed314c3da3dd452b4aa865 +Subproject 48c1316e907ca597e27e5a7624160dc18a4df8e diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 45c82b9fa..d6a9e1c6e 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1016,7 +1016,7 @@ fn NewFlags(comptime debug_mode: bool) type { is_transfer_encoding: bool = false, /// Used to identify if request can be safely deinitialized - is_waiting_body: bool = false, + is_waiting_for_request_body: bool = false, /// Used in renderMissing in debug mode to show the user an HTML page /// Used to avoid looking at the uws.Request struct after it's been freed is_web_browser_navigation: if (debug_mode) bool else void = if (debug_mode) false else {}, @@ -1080,9 +1080,21 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp /// When the response body is a temporary value response_buf_owned: std.ArrayListUnmanaged(u8) = .{}, + /// Defer finalization until after the request handler task is completed? + defer_deinit_until_callback_completes: ?*bool = null, + // TODO: support builtin compression const can_sendfile = !ssl_enabled; + pub inline fn isAsync(this: *const RequestContext) bool { + return this.defer_deinit_until_callback_completes == null; + } + + fn drainMicrotasks(this: *const RequestContext) void { + if (this.isAsync()) return; + this.server.vm.drainMicrotasks(); + } + pub fn setAbortHandler(this: *RequestContext) void { if (this.flags.has_abort_handler) return; if (this.resp) |resp| { @@ -1320,8 +1332,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp pub fn end(this: *RequestContext, data: []const u8, closeConnection: bool) void { if (this.resp) |resp| { - if (this.flags.is_waiting_body) { - this.flags.is_waiting_body = false; + if (this.flags.is_waiting_for_request_body) { + this.flags.is_waiting_for_request_body = false; resp.clearOnData(); } resp.end(data, closeConnection); @@ -1331,8 +1343,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp pub fn endStream(this: *RequestContext, closeConnection: bool) void { if (this.resp) |resp| { - if (this.flags.is_waiting_body) { - this.flags.is_waiting_body = false; + if (this.flags.is_waiting_for_request_body) { + this.flags.is_waiting_for_request_body = false; resp.clearOnData(); } resp.endStream(closeConnection); @@ -1342,8 +1354,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp pub fn endWithoutBody(this: *RequestContext, closeConnection: bool) void { if (this.resp) |resp| { - if (this.flags.is_waiting_body) { - this.flags.is_waiting_body = false; + if (this.flags.is_waiting_for_request_body) { + this.flags.is_waiting_for_request_body = false; resp.clearOnData(); } resp.endWithoutBody(closeConnection); @@ -1562,9 +1574,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // if we are waiting for the body yet and the request was not aborted we can safely clear the onData callback if (this.resp) |resp| { - if (this.flags.is_waiting_body and this.flags.aborted == false) { + if (this.flags.is_waiting_for_request_body and this.flags.aborted == false) { resp.clearOnData(); - this.flags.is_waiting_body = false; + this.flags.is_waiting_for_request_body = false; } } } @@ -1576,6 +1588,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } pub fn deinit(this: *RequestContext) void { + if (this.defer_deinit_until_callback_completes) |defer_deinit| { + defer_deinit.* = true; + ctxLog("deferred deinit <d> ({*})<r>", .{this}); + return; + } + ctxLog("deinit<d> ({*})<r>", .{this}); if (comptime Environment.allow_assert) std.debug.assert(this.flags.finalized); @@ -1953,6 +1971,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp const StreamPair = struct { this: *RequestContext, stream: JSC.WebCore.ReadableStream }; + fn handleFirstStreamWrite(this: *@This()) void { + if (!this.flags.has_written_status) { + this.renderMetadata(); + } + } + fn doRenderStream(pair: *StreamPair) void { var this = pair.this; var stream = pair.stream; @@ -1963,20 +1987,18 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } const resp = this.resp.?; - // uWS automatically adds the status line if needed - // we want to batch network calls as much as possible - if (!(this.response_ptr.?.statusCode() == 200 and this.response_ptr.?.body.init.headers == null)) { - this.renderMetadata(); - } - stream.value.ensureStillAlive(); var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable; + var globalThis = this.server.globalThis; response_stream.* = ResponseStream.JSSink{ .sink = .{ .res = resp, .allocator = this.allocator, .buffer = bun.ByteList{}, + .onFirstWrite = @ptrCast(&handleFirstStreamWrite), + .ctx = this, + .globalThis = globalThis, }, }; var signal = &response_stream.sink.signal; @@ -1991,13 +2013,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // We are already corked! const assignment_result: JSValue = ResponseStream.JSSink.assignToStream( - this.server.globalThis, + globalThis, stream.value, response_stream, @as(**anyopaque, @ptrCast(&signal.ptr)), ); assignment_result.ensureStillAlive(); + // assert that it was updated std.debug.assert(!signal.isDead()); @@ -2015,32 +2038,18 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp response_stream.detach(); this.sink = null; response_stream.sink.destroy(); - stream.value.unprotect(); return this.handleReject(err_value); } - if (response_stream.sink.done or - // TODO: is there a condition where resp could be freed before done? - resp.hasResponded()) - { + if (resp.hasResponded()) { if (!this.flags.aborted) resp.clearAborted(); - const wrote_anything = response_stream.sink.wrote > 0; - streamLog("is done", .{}); - const responded = resp.hasResponded(); - + streamLog("done", .{}); response_stream.detach(); this.sink = null; response_stream.sink.destroy(); - if (!responded and !wrote_anything and !this.flags.aborted) { - this.renderMissing(); - return; - } else if (wrote_anything and !responded and !this.flags.aborted) { - this.endStream(this.shouldCloseConnection()); - } - + this.endStream(this.shouldCloseConnection()); this.finalize(); stream.value.unprotect(); - return; } @@ -2049,19 +2058,28 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // it returns a Promise when it goes through ReadableStreamDefaultReader if (assignment_result.asAnyPromise()) |promise| { streamLog("returned a promise", .{}); - switch (promise.status(this.server.globalThis.vm())) { + this.drainMicrotasks(); + + switch (promise.status(globalThis.vm())) { .Pending => { streamLog("promise still Pending", .{}); + if (!this.flags.has_written_status) { + response_stream.sink.onFirstWrite = null; + response_stream.sink.ctx = null; + this.renderMetadata(); + } + // TODO: should this timeout? this.setAbortHandler(); + this.pending_promises_for_abort += 1; this.response_ptr.?.body.value = .{ .Locked = .{ .readable = stream, - .global = this.server.globalThis, + .global = globalThis, }, }; assignment_result.then( - this.server.globalThis, + globalThis, this, onResolveStream, onRejectStream, @@ -2071,11 +2089,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp }, .Fulfilled => { streamLog("promise Fulfilled", .{}); + defer stream.value.unprotect(); + this.handleResolveStream(); }, .Rejected => { streamLog("promise Rejected", .{}); - this.handleRejectStream(this.server.globalThis, promise.result(this.server.globalThis.vm())); + defer stream.value.unprotect(); + + this.handleRejectStream(globalThis, promise.result(globalThis.vm())); }, } return; @@ -2084,22 +2106,23 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (this.flags.aborted) { response_stream.detach(); - stream.cancel(this.server.globalThis); - response_stream.sink.done = true; + stream.cancel(globalThis); + defer stream.value.unprotect(); + response_stream.sink.markDone(); this.finalizeForAbort(); response_stream.sink.finalize(); - stream.value.unprotect(); return; } stream.value.ensureStillAlive(); + defer stream.value.unprotect(); const is_in_progress = response_stream.sink.has_backpressure or !(response_stream.sink.wrote == 0 and response_stream.sink.buffer.len == 0); - if (!stream.isLocked(this.server.globalThis) and !is_in_progress) { - if (JSC.WebCore.ReadableStream.fromJS(stream.value, this.server.globalThis)) |comparator| { + if (!stream.isLocked(globalThis) and !is_in_progress) { + if (JSC.WebCore.ReadableStream.fromJS(stream.value, globalThis)) |comparator| { if (std.meta.activeTag(comparator.ptr) == std.meta.activeTag(stream.ptr)) { streamLog("is not locked", .{}); this.renderMissing(); @@ -2111,7 +2134,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.setAbortHandler(); streamLog("is in progress, but did not return a Promise. Finalizing request context", .{}); this.finalize(); - stream.value.unprotect(); } const streamLog = Output.scoped(.ReadableStream, false); @@ -2120,6 +2142,46 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp return @intFromPtr(this.upgrade_context) == std.math.maxInt(usize); } + fn toAsyncWithoutAbortHandler(ctx: *RequestContext, req: *uws.Request, request_object: *Request) void { + request_object.uws_request = req; + + request_object.ensureURL() catch { + request_object.url = bun.String.empty; + }; + + // we have to clone the request headers here since they will soon belong to a different request + if (request_object.headers == null) { + request_object.headers = JSC.FetchHeaders.createFromUWS(ctx.server.globalThis, req); + } + + // This object dies after the stack frame is popped + // so we have to clear it in here too + request_object.uws_request = null; + } + + fn toAsync( + ctx: *RequestContext, + req: *uws.Request, + request_object: *Request, + ) void { + ctxLog("toAsync", .{}); + ctx.toAsyncWithoutAbortHandler(req, request_object); + if (comptime debug_mode) { + ctx.pathname = request_object.url.clone(); + } + ctx.setAbortHandler(); + } + + // Each HTTP request or TCP socket connection is effectively a "task". + // + // However, unlike the regular task queue, we don't drain the microtask + // queue at the end. + // + // Instead, we drain it multiple times, at the points that would + // otherwise "halt" the Response from being rendered. + // + // - If you return a Promise, we drain the microtask queue once + // - If you return a streaming Response, we drain the microtask queue (possibly the 2nd time this task!) pub fn onResponse( ctx: *RequestContext, this: *ThisServer, @@ -2128,8 +2190,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp request_value: JSValue, response_value: JSValue, ) void { + _ = request_object; + _ = req; request_value.ensureStillAlive(); response_value.ensureStillAlive(); + ctx.drainMicrotasks(); if (ctx.flags.aborted) { ctx.finalizeForAbort(); @@ -2159,6 +2224,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp ctx.response_jsvalue = response_value; ctx.response_jsvalue.ensureStillAlive(); ctx.flags.response_protected = false; + response.body.value.toBlobIfPossible(); switch (response.body.value) { @@ -2210,6 +2276,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp ctx.response_jsvalue.ensureStillAlive(); ctx.flags.response_protected = false; ctx.response_ptr = response; + response.body.value.toBlobIfPossible(); switch (response.body.value) { .Blob => |*blob| { @@ -2236,35 +2303,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } if (wait_for_promise) { - request_object.uws_request = req; - - request_object.ensureURL() catch { - request_object.url = bun.String.empty; - }; - - // we have to clone the request headers here since they will soon belong to a different request - if (request_object.headers == null) { - request_object.headers = JSC.FetchHeaders.createFromUWS(this.globalThis, req); - } - - if (comptime debug_mode) { - ctx.pathname = request_object.url.clone(); - } - - // This object dies after the stack frame is popped - // so we have to clear it in here too - request_object.uws_request = null; - - ctx.setAbortHandler(); ctx.pending_promises_for_abort += 1; - response_value.then(this.globalThis, ctx, RequestContext.onResolve, RequestContext.onReject); return; } - if (ctx.resp) |resp| { - // The user returned something that wasn't a promise or a promise with a response - if (!resp.hasResponded() and !ctx.flags.has_marked_pending) ctx.renderMissing(); - } } pub fn handleResolveStream(req: *RequestContext) void { @@ -2276,6 +2318,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp wrapper.sink.done = true; req.flags.aborted = req.flags.aborted or wrapper.sink.aborted; wrote_anything = wrapper.sink.wrote > 0; + wrapper.sink.finalize(); wrapper.detach(); req.sink = null; @@ -2300,12 +2343,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp const responded = resp.hasResponded(); - if (!responded and !wrote_anything) { - resp.clearAborted(); - req.renderMissing(); - return; - } else if (!responded and wrote_anything) { + if (!responded) { resp.clearAborted(); + if (!req.flags.has_written_status) { + req.renderMetadata(); + } req.endStream(req.shouldCloseConnection()); } @@ -2316,6 +2358,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp streamLog("onResolveStream", .{}); var args = callframe.arguments(2); var req: *@This() = args.ptr[args.len - 1].asPromisePtr(@This()); + req.pending_promises_for_abort -|= 1; req.handleResolveStream(); return JSValue.jsUndefined(); } @@ -2323,19 +2366,19 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp streamLog("onRejectStream", .{}); const args = callframe.arguments(2); var req = args.ptr[args.len - 1].asPromisePtr(@This()); + req.pending_promises_for_abort -|= 1; var err = args.ptr[0]; req.handleRejectStream(globalThis, err); return JSValue.jsUndefined(); } pub fn handleRejectStream(req: *@This(), globalThis: *JSC.JSGlobalObject, err: JSValue) void { + _ = globalThis; streamLog("handleRejectStream", .{}); - var wrote_anything = req.flags.has_written_status; if (req.sink) |wrapper| { wrapper.sink.pending_flush = null; wrapper.sink.done = true; - wrote_anything = wrote_anything or wrapper.sink.wrote > 0; req.flags.aborted = req.flags.aborted or wrapper.sink.aborted; wrapper.sink.finalize(); wrapper.detach(); @@ -2350,40 +2393,32 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } } - streamLog("onReject({any})", .{wrote_anything}); - - //aborted so call finalizeForAbort + // aborted so call finalizeForAbort if (req.flags.aborted) { req.finalizeForAbort(); return; } - if (!err.isEmptyOrUndefinedOrNull() and !wrote_anything) { - req.response_jsvalue.unprotect(); - req.response_jsvalue = JSValue.zero; - req.handleReject(err); - return; - } else if (wrote_anything) { - req.endStream(true); - if (comptime debug_mode) { - if (!err.isEmptyOrUndefinedOrNull()) { - var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator); - defer exception_list.deinit(); - req.server.vm.runErrorHandler(err, &exception_list); - } - } - req.finalize(); - return; + streamLog("onReject()", .{}); + + if (!req.flags.has_written_status) { + req.renderMetadata(); } - const fallback = JSC.SystemError{ - .code = bun.String.static(@as(string, @tagName(JSC.Node.ErrorCode.ERR_UNHANDLED_ERROR))), - .message = bun.String.static("Unhandled error in ReadableStream"), - }; - req.handleReject(fallback.toErrorInstance(globalThis)); + req.endStream(true); + if (comptime debug_mode) { + if (!err.isEmptyOrUndefinedOrNull()) { + var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator); + defer exception_list.deinit(); + req.server.vm.runErrorHandler(err, &exception_list); + } + } + req.finalize(); } pub fn doRenderWithBody(this: *RequestContext, value: *JSC.WebCore.Body.Value) void { + this.drainMicrotasks(); + // If a ReadableStream can trivially be converted to a Blob, do so. // If it's a WTFStringImpl and it cannot be used as a UTF-8 string, convert it to a Blob. value.toBlobIfPossible(); @@ -2833,8 +2868,13 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp std.debug.assert(this.resp == resp); - this.flags.is_waiting_body = last == false; + this.flags.is_waiting_for_request_body = last == false; if (this.flags.aborted or this.flags.has_marked_complete) return; + if (!last and chunk.len == 0) { + // Sometimes, we get back an empty chunk + // We have to ignore those chunks unless it's the last one + return; + } if (this.request_body != null) { var body = this.request_body.?; @@ -2900,6 +2940,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } if (old == .Locked) { + defer this.drainMicrotasks(); + old.resolve(&body.value, this.server.globalThis); } return; @@ -3254,7 +3296,7 @@ pub const WebSocketServer = struct { globalObject.throwInvalidArguments("websocket expects maxPayloadLength to be an integer", .{}); return null; } - server.maxPayloadLength = @as(u32, @intCast(@max(value.toInt64(), 0))); + server.maxPayloadLength = @truncate(@max(value.toInt64(), 0)); } } @@ -3265,7 +3307,7 @@ pub const WebSocketServer = struct { return null; } - var idleTimeout = @as(u16, @intCast(@as(u32, @truncate(@max(value.toInt64(), 0))))); + var idleTimeout: u16 = @truncate(@max(value.toInt64(), 0)); if (idleTimeout > 960) { globalObject.throwInvalidArguments("websocket expects idleTimeout to be 960 or less", .{}); return null; @@ -3285,7 +3327,7 @@ pub const WebSocketServer = struct { return null; } - server.backpressureLimit = @as(u32, @intCast(@max(value.toInt64(), 0))); + server.backpressureLimit = @truncate(@max(value.toInt64(), 0)); } } @@ -5271,6 +5313,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp ) void { JSC.markBinding(@src()); this.pending_requests += 1; + req.setYield(false); var ctx = this.request_pool_allocator.tryGet() catch @panic("ran out of memory"); ctx.create(this, req, resp); @@ -5337,7 +5380,8 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp .onStartStreaming = RequestContext.onStartStreamingRequestBodyCallback, }, }; - ctx.flags.is_waiting_body = true; + ctx.flags.is_waiting_for_request_body = true; + resp.onData(*RequestContext, RequestContext.onBufferedBodyChunk, ctx); } } @@ -5352,7 +5396,13 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp request_value.ensureStillAlive(); const response_value = this.config.onRequest.callWithThis(this.globalThis, this.thisObject, &args); + defer { + // uWS request will not live longer than this function + request_object.uws_request = null; + } + var should_deinit_context = false; + ctx.defer_deinit_until_callback_completes = &should_deinit_context; ctx.onResponse( this, req, @@ -5360,8 +5410,20 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp request_value, response_value, ); - // uWS request will not live longer than this function - request_object.uws_request = null; + ctx.defer_deinit_until_callback_completes = null; + + if (should_deinit_context) { + request_object.uws_request = null; + ctx.deinit(); + return; + } + + if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body) { + ctx.renderMissing(); + return; + } + + ctx.toAsync(req, request_object); } pub fn onWebSocketUpgrade( @@ -5404,7 +5466,13 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp const request_value = args[0]; request_value.ensureStillAlive(); const response_value = this.config.onRequest.callWithThis(this.globalThis, this.thisObject, &args); + defer { + // uWS request will not live longer than this function + request_object.uws_request = null; + } + var should_deinit_context = false; + ctx.defer_deinit_until_callback_completes = &should_deinit_context; ctx.onResponse( this, req, @@ -5412,9 +5480,20 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp request_value, response_value, ); + ctx.defer_deinit_until_callback_completes = null; - // uWS request will not live longer than this function - request_object.uws_request = null; + if (should_deinit_context) { + request_object.uws_request = null; + ctx.deinit(); + return; + } + + if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body) { + ctx.renderMissing(); + return; + } + + ctx.toAsync(req, request_object); } pub fn listen(this: *ThisServer) void { diff --git a/src/bun.js/bindings/AsyncContextFrame.cpp b/src/bun.js/bindings/AsyncContextFrame.cpp index 2a103a8d1..1c541b2a8 100644 --- a/src/bun.js/bindings/AsyncContextFrame.cpp +++ b/src/bun.js/bindings/AsyncContextFrame.cpp @@ -97,10 +97,18 @@ extern "C" EncodedJSValue AsyncContextFrame__withAsyncContextIfNeeded(JSGlobalOb // } JSValue AsyncContextFrame::call(JSGlobalObject* global, JSValue functionObject, JSValue thisValue, const ArgList& args) { + if (LIKELY(!global->isAsyncContextTrackingEnabled())) { + return JSC::profiledCall(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args); + } + ASYNCCONTEXTFRAME_CALL_IMPL(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args); } JSValue AsyncContextFrame::call(JSGlobalObject* global, JSValue functionObject, JSValue thisValue, const ArgList& args, NakedPtr<Exception>& returnedException) { + if (LIKELY(!global->isAsyncContextTrackingEnabled())) { + return JSC::profiledCall(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args, returnedException); + } + ASYNCCONTEXTFRAME_CALL_IMPL(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args, returnedException); } diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp index 0ecafeae4..d3bd623dd 100644 --- a/src/bun.js/bindings/ZigGlobalObject.cpp +++ b/src/bun.js/bindings/ZigGlobalObject.cpp @@ -1411,6 +1411,16 @@ JSC_DEFINE_HOST_FUNCTION(asyncHooksCleanupLater, (JSC::JSGlobalObject * globalOb return JSC::JSValue::encode(JSC::jsUndefined()); } +JSC_DEFINE_HOST_FUNCTION(asyncHooksSetEnabled, (JSC::JSGlobalObject * globalObject, JSC::CallFrame* callFrame)) +{ + // assumptions and notes: + // - nobody else uses setOnEachMicrotaskTick + // - this is called by js if we set async context in a way we may not clear it + // - AsyncLocalStorage.prototype.run cleans up after itself and does not call this cb + globalObject->setAsyncContextTrackingEnabled(callFrame->argument(0).toBoolean(globalObject)); + return JSC::JSValue::encode(JSC::jsUndefined()); +} + extern "C" int Bun__ttySetMode(int fd, int mode); JSC_DEFINE_HOST_FUNCTION(jsTTYSetMode, (JSC::JSGlobalObject * globalObject, CallFrame* callFrame)) @@ -1689,6 +1699,10 @@ static JSC_DEFINE_HOST_FUNCTION(functionLazyLoad, if (string == "async_hooks"_s) { auto* obj = constructEmptyObject(globalObject); obj->putDirect( + vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "setAsyncHooksEnabled"_s)), + JSC::JSFunction::create(vm, globalObject, 0, "setAsyncHooksEnabled"_s, asyncHooksSetEnabled, ImplementationVisibility::Public), 0); + + obj->putDirect( vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "cleanupLater"_s)), JSC::JSFunction::create(vm, globalObject, 0, "cleanupLater"_s, asyncHooksCleanupLater, ImplementationVisibility::Public), 0); return JSValue::encode(obj); diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 92874b6a4..896297060 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -509,6 +509,7 @@ comptime { } } +pub const DeferredRepeatingTask = *const (fn (*anyopaque) bool); pub const EventLoop = struct { tasks: Queue = undefined, concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{}, @@ -518,6 +519,7 @@ pub const EventLoop = struct { start_server_on_next_tick: bool = false, defer_count: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0), forever_timer: ?*uws.Timer = null, + deferred_microtask_map: std.AutoArrayHashMapUnmanaged(?*anyopaque, DeferredRepeatingTask) = .{}, pub const Queue = std.fifo.LinearFifo(Task, .Dynamic); const log = bun.Output.scoped(.EventLoop, false); @@ -528,6 +530,49 @@ pub const EventLoop = struct { } } + pub fn drainMicrotasksWithVM(this: *EventLoop, vm: *JSC.VM) void { + vm.drainMicrotasks(); + this.drainDeferredTasks(); + } + + pub fn drainMicrotasks(this: *EventLoop) void { + this.drainMicrotasksWithVM(this.global.vm()); + } + + pub fn ensureAliveForOneTick(this: *EventLoop) void { + if (this.noop_task.scheduled) return; + this.enqueueTask(Task.init(&this.noop_task)); + this.noop_task.scheduled = true; + } + + pub fn registerDeferredTask(this: *EventLoop, ctx: ?*anyopaque, task: DeferredRepeatingTask) bool { + const existing = this.deferred_microtask_map.getOrPutValue(this.virtual_machine.allocator, ctx, task) catch unreachable; + return existing.found_existing; + } + + pub fn unregisterDeferredTask(this: *EventLoop, ctx: ?*anyopaque) bool { + return this.deferred_microtask_map.swapRemove(ctx); + } + + fn drainDeferredTasks(this: *EventLoop) void { + var i: usize = 0; + var last = this.deferred_microtask_map.count(); + while (i < last) { + var key = this.deferred_microtask_map.keys()[i] orelse { + this.deferred_microtask_map.swapRemoveAt(i); + last = this.deferred_microtask_map.count(); + continue; + }; + + if (!this.deferred_microtask_map.values()[i](key)) { + this.deferred_microtask_map.swapRemoveAt(i); + last = this.deferred_microtask_map.count(); + } else { + i += 1; + } + } + } + pub fn tickWithCount(this: *EventLoop) u32 { var global = this.global; var global_vm = global.vm(); @@ -621,7 +666,7 @@ pub const EventLoop = struct { } global_vm.releaseWeakRefs(); - global_vm.drainMicrotasks(); + this.drainMicrotasksWithVM(global_vm); } this.tasks.head = if (this.tasks.count == 0) 0 else this.tasks.head; @@ -758,7 +803,7 @@ pub const EventLoop = struct { this.tickConcurrent(); } else { global_vm.releaseWeakRefs(); - global_vm.drainMicrotasks(); + this.drainMicrotasksWithVM(global_vm); this.tickConcurrent(); if (this.tasks.count > 0) continue; } diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index a016129e2..492b9fbee 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -1742,6 +1742,10 @@ pub const VirtualMachine = struct { ret.success = true; } + pub fn drainMicrotasks(this: *VirtualMachine) void { + this.eventLoop().drainMicrotasks(); + } + pub fn processFetchLog(globalThis: *JSGlobalObject, specifier: bun.String, referrer: bun.String, log: *logger.Log, ret: *ErrorableResolvedSource, err: anyerror) void { switch (log.msgs.items.len) { 0 => { diff --git a/src/bun.js/webcore/body.zig b/src/bun.js/webcore/body.zig index fa0ec9b24..86462dd04 100644 --- a/src/bun.js/webcore/body.zig +++ b/src/bun.js/webcore/body.zig @@ -249,10 +249,7 @@ pub const Body = struct { pub fn setPromise(value: *PendingValue, globalThis: *JSC.JSGlobalObject, action: Action) JSValue { value.action = action; - if (value.readable) |readable| { - // switch (readable.ptr) { - // .JavaScript - // } + if (value.readable) |readable| handle_stream: { switch (action) { .getFormData, .getText, .getJSON, .getBlob, .getArrayBuffer => { value.promise = switch (action) { @@ -261,6 +258,20 @@ pub const Body = struct { .getText => globalThis.readableStreamToText(readable.value), .getBlob => globalThis.readableStreamToBlob(readable.value), .getFormData => |form_data| brk: { + if (value.onStartBuffering != null) { + if (readable.isDisturbed(globalThis)) { + form_data.?.deinit(); + readable.value.unprotect(); + value.readable = null; + value.action = .{ .none = {} }; + return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.createErrorInstance("ReadableStream is already used", .{})); + } else { + readable.detach(globalThis); + value.readable = null; + } + + break :handle_stream; + } defer { form_data.?.deinit(); value.action.getFormData = null; diff --git a/src/bun.js/webcore/request.zig b/src/bun.js/webcore/request.zig index c01e72d60..aaa3f6b79 100644 --- a/src/bun.js/webcore/request.zig +++ b/src/bun.js/webcore/request.zig @@ -485,7 +485,8 @@ pub const Request = struct { _ = req.body.unref(); return null; }; - req.url = str; + req.url = str.dupeRef(); + if (!req.url.isEmpty()) fields.insert(.url); } else if (!url_or_object_type.isObject()) { @@ -554,7 +555,7 @@ pub const Request = struct { if (!fields.contains(.url)) { if (!response.url.isEmpty()) { - req.url = response.url; + req.url = response.url.dupeRef(); fields.insert(.url); } } @@ -586,7 +587,7 @@ pub const Request = struct { if (!fields.contains(.url)) { if (value.fastGet(globalThis, .url)) |url| { - req.url = bun.String.fromJS(url, globalThis); + req.url = bun.String.fromJS(url, globalThis).dupeRef(); if (!req.url.isEmpty()) fields.insert(.url); @@ -599,7 +600,7 @@ pub const Request = struct { _ = req.body.unref(); return null; }; - req.url = str; + req.url = str.dupeRef(); if (!req.url.isEmpty()) fields.insert(.url); } @@ -648,9 +649,10 @@ pub const Request = struct { return null; } - // Note that the string is going to be ref'd here, so we don't need to ref it above. const href = JSC.URL.hrefFromString(req.url); if (href.isEmpty()) { + // globalThis.throw can cause GC, which could cause the above string to be freed. + // so we must increment the reference count before calling it. globalThis.throw("Failed to construct 'Request': Invalid URL \"{}\"", .{ req.url, }); @@ -658,6 +660,14 @@ pub const Request = struct { _ = req.body.unref(); return null; } + + // hrefFromString increments the reference count if they end up being + // the same + // + // we increment the reference count on usage above, so we must + // decrement it to be perfectly balanced. + req.url.deref(); + req.url = href; if (req.body.value == .Blob and diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 01ecfad36..d947a7d4e 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -617,6 +617,7 @@ pub const Fetch = struct { http: ?*HTTPClient.AsyncHTTP = null, result: HTTPClient.HTTPClientResult = .{}, + metadata: ?HTTPClient.HTTPClientResult.ResultMetadata = .{}, javascript_vm: *VirtualMachine = undefined, global_this: *JSGlobalObject = undefined, request_body: HTTPRequestBody = undefined, @@ -641,7 +642,8 @@ pub const Fetch = struct { url_proxy_buffer: []const u8 = "", signal: ?*JSC.WebCore.AbortSignal = null, - aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + signals: HTTPClient.Signals = .{}, + signal_store: HTTPClient.Signals.Store = .{}, has_schedule_callback: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), // must be stored because AbortSignal stores reason weakly @@ -702,11 +704,19 @@ pub const Fetch = struct { this.request_headers.entries.deinit(bun.default_allocator); this.request_headers.buf.deinit(bun.default_allocator); this.request_headers = Headers{ .allocator = undefined }; - this.http.?.clearData(); - this.result.deinitMetadata(); + if (this.http != null) { + this.http.?.clearData(); + } + + if (this.metadata != null) { + this.metadata.?.deinit(); + this.metadata = null; + } + this.response_buffer.deinit(); this.response.deinit(); + this.scheduled_response_buffer.deinit(); this.request_body.detach(); @@ -725,9 +735,13 @@ pub const Fetch = struct { } pub fn onBodyReceived(this: *FetchTasklet) void { + this.mutex.lock(); const success = this.result.isSuccess(); const globalThis = this.global_this; defer { + this.has_schedule_callback.store(false, .Monotonic); + this.mutex.unlock(); + if (!success or !this.result.has_more) { var vm = globalThis.bunVM(); this.poll_ref.unref(vm); @@ -831,43 +845,42 @@ pub const Fetch = struct { pub fn onProgressUpdate(this: *FetchTasklet) void { JSC.markBinding(@src()); - this.mutex.lock(); - defer { - this.has_schedule_callback.store(false, .Monotonic); - this.mutex.unlock(); - } - if (this.is_waiting_body) { return this.onBodyReceived(); } + this.mutex.lock(); const globalThis = this.global_this; var ref = this.promise; const promise_value = ref.value(); - defer ref.strong.deinit(); var poll_ref = this.poll_ref; var vm = globalThis.bunVM(); if (promise_value.isEmptyOrUndefinedOrNull()) { + ref.strong.deinit(); + this.has_schedule_callback.store(false, .Monotonic); + this.mutex.unlock(); poll_ref.unref(vm); this.clearData(); this.deinit(); return; } + const promise = promise_value.asAnyPromise().?; + const tracker = this.tracker; + tracker.willDispatch(globalThis); defer { + tracker.didDispatch(globalThis); + ref.strong.deinit(); + this.has_schedule_callback.store(false, .Monotonic); + this.mutex.unlock(); if (!this.is_waiting_body) { poll_ref.unref(vm); this.clearData(); this.deinit(); } } - - const promise = promise_value.asAnyPromise().?; - const tracker = this.tracker; - tracker.willDispatch(globalThis); - defer tracker.didDispatch(globalThis); const success = this.result.isSuccess(); const result = switch (success) { true => this.onResolve(), @@ -907,6 +920,16 @@ pub const Fetch = struct { return JSC.WebCore.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this); } + var path: bun.String = undefined; + + if (this.metadata) |metadata| { + path = bun.String.create(metadata.href); + } else if (this.http) |http| { + path = bun.String.create(http.url.href); + } else { + path = bun.String.empty; + } + const fetch_error = JSC.SystemError{ .code = bun.String.static(@errorName(this.result.fail)), .message = switch (this.result.fail) { @@ -916,7 +939,7 @@ pub const Fetch = struct { error.ConnectionRefused => bun.String.static("Unable to connect. Is the computer able to access the url?"), else => bun.String.static("fetch() failed. For more information, pass `verbose: true` in the second argument to fetch()"), }, - .path = bun.String.create(this.http.?.url.href), + .path = path, }; return fetch_error.toErrorInstance(this.global_this); @@ -927,7 +950,7 @@ pub const Fetch = struct { if (this.http) |http| { http.enableBodyStreaming(); } - if (this.aborted.load(.Acquire)) { + if (this.signal_store.aborted.load(.Monotonic)) { return JSC.WebCore.DrainResult{ .aborted = {}, }; @@ -1000,21 +1023,27 @@ pub const Fetch = struct { } fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response { - const http_response = this.result.response; - this.is_waiting_body = this.result.has_more; - return Response{ - .allocator = allocator, - .url = bun.String.createAtomIfPossible(this.result.href), - .status_text = bun.String.createAtomIfPossible(http_response.status), - .redirected = this.result.redirected, - .body = .{ - .init = .{ - .headers = FetchHeaders.createFromPicoHeaders(http_response.headers), - .status_code = @as(u16, @truncate(http_response.status_code)), + // at this point we always should have metadata + std.debug.assert(this.metadata != null); + if (this.metadata) |metadata| { + const http_response = metadata.response; + this.is_waiting_body = this.result.has_more; + return Response{ + .allocator = allocator, + .url = bun.String.createAtomIfPossible(metadata.href), + .status_text = bun.String.createAtomIfPossible(http_response.status), + .redirected = this.result.redirected, + .body = .{ + .init = .{ + .headers = FetchHeaders.createFromPicoHeaders(http_response.headers), + .status_code = @as(u16, @truncate(http_response.status_code)), + }, + .value = this.toBodyValue(), }, - .value = this.toBodyValue(), - }, - }; + }; + } + + @panic("fetch metadata should be provided"); } pub fn onResolve(this: *FetchTasklet) JSValue { @@ -1063,6 +1092,7 @@ pub const Fetch = struct { .hostname = fetch_options.hostname, .tracker = JSC.AsyncTaskTracker.init(jsc_vm), }; + fetch_tasklet.signals = fetch_tasklet.signal_store.to(); fetch_tasklet.tracker.didSchedule(globalThis); @@ -1079,6 +1109,10 @@ pub const Fetch = struct { proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url); } + if (fetch_tasklet.signal == null) { + fetch_tasklet.signals.aborted = null; + } + fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init( allocator, fetch_options.method, @@ -1095,9 +1129,10 @@ pub const Fetch = struct { fetch_tasklet, ), proxy, - if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null, + fetch_options.hostname, fetch_options.redirect_type, + fetch_tasklet.signals, ); if (fetch_options.redirect_type != FetchRedirect.follow) { @@ -1108,7 +1143,7 @@ pub const Fetch = struct { fetch_tasklet.http.?.client.verbose = fetch_options.verbose; fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive; // we wanna to return after headers are received - fetch_tasklet.http.?.signalHeaderProgress(); + fetch_tasklet.signal_store.header_progress.store(true, .Monotonic); if (fetch_tasklet.request_body == .Sendfile) { std.debug.assert(fetch_options.url.isHTTP()); @@ -1127,7 +1162,7 @@ pub const Fetch = struct { reason.ensureStillAlive(); this.abort_reason = reason; reason.protect(); - this.aborted.store(true, .Monotonic); + this.signal_store.aborted.store(true, .Monotonic); this.tracker.didCancel(this.global_this); if (this.http != null) { @@ -1180,11 +1215,14 @@ pub const Fetch = struct { task.mutex.lock(); defer task.mutex.unlock(); task.result = result; + // metadata should be provided only once so we preserve it until we consume it + if (result.metadata) |metadata| { + task.metadata = metadata; + } task.body_size = result.body_size; const success = result.isSuccess(); task.response_buffer = result.body.?.*; - if (success) { _ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch @panic("OOM"); } diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 955d10ffb..771d34db0 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -1915,6 +1915,34 @@ pub const ArrayBufferSink = struct { pub const JSSink = NewJSSink(@This(), "ArrayBufferSink"); }; +const AutoFlusher = struct { + registered: bool = false, + + pub fn registerDeferredMicrotaskWithType(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void { + if (this.auto_flusher.registered) return; + this.auto_flusher.registered = true; + std.debug.assert(!vm.eventLoop().registerDeferredTask(this, @ptrCast(&Type.onAutoFlush))); + } + + pub fn unregisterDeferredMicrotaskWithType(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void { + if (!this.auto_flusher.registered) return; + this.auto_flusher.registered = false; + std.debug.assert(vm.eventLoop().unregisterDeferredTask(this)); + } + + pub fn unregisterDeferredMicrotaskWithTypeUnchecked(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void { + std.debug.assert(this.auto_flusher.registered); + std.debug.assert(vm.eventLoop().unregisterDeferredTask(this)); + this.auto_flusher.registered = false; + } + + pub fn registerDeferredMicrotaskWithTypeUnchecked(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void { + std.debug.assert(!this.auto_flusher.registered); + this.auto_flusher.registered = true; + std.debug.assert(!vm.eventLoop().registerDeferredTask(this, @ptrCast(&Type.onAutoFlush))); + } +}; + pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { return struct { sink: SinkType, @@ -2357,6 +2385,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { end_len: usize = 0, aborted: bool = false, + onFirstWrite: ?*const fn (?*anyopaque) void = null, + ctx: ?*anyopaque = null, + + auto_flusher: AutoFlusher = AutoFlusher{}, + const log = Output.scoped(.HTTPServerWritable, false); pub fn connect(this: *@This(), signal: Signal) void { @@ -2375,15 +2408,25 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } } + fn handleFirstWriteIfNecessary(this: *@This()) void { + if (this.onFirstWrite) |onFirstWrite| { + var ctx = this.ctx; + this.ctx = null; + this.onFirstWrite = null; + onFirstWrite(ctx); + } + } + fn hasBackpressure(this: *const @This()) bool { return this.has_backpressure; } - fn send(this: *@This(), buf: []const u8) bool { + fn sendWithoutAutoFlusher(this: *@This(), buf: []const u8) bool { std.debug.assert(!this.done); defer log("send: {d} bytes (backpressure: {any})", .{ buf.len, this.has_backpressure }); if (this.requested_end and !this.res.state().isHttpWriteCalled()) { + this.handleFirstWriteIfNecessary(); const success = this.res.tryEnd(buf, this.end_len, false); this.has_backpressure = !success; return success; @@ -2395,10 +2438,12 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { // so in this scenario, we just append to the buffer // and report success if (this.requested_end) { + this.handleFirstWriteIfNecessary(); this.res.end(buf, false); this.has_backpressure = false; return true; } else { + this.handleFirstWriteIfNecessary(); this.has_backpressure = !this.res.write(buf); if (this.has_backpressure) { this.res.onWritable(*@This(), onWritable, this); @@ -2409,6 +2454,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { unreachable; } + fn send(this: *@This(), buf: []const u8) bool { + this.unregisterAutoFlusher(); + return this.sendWithoutAutoFlusher(buf); + } + fn readableSlice(this: *@This()) []const u8 { return this.buffer.ptr[this.offset..this.buffer.cap][0..this.buffer.len]; } @@ -2464,7 +2514,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { pub fn start(this: *@This(), stream_start: StreamStart) JSC.Node.Maybe(void) { if (this.aborted or this.res.hasResponded()) { - this.done = true; + this.markDone(); this.signal.close(null); return .{ .result = {} }; } @@ -2529,6 +2579,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { pub fn flushFromJS(this: *@This(), globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) { log("flushFromJS({any})", .{wait}); + this.unregisterAutoFlusher(); + if (!wait) { return this.flushFromJSNoWait(); } @@ -2563,12 +2615,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { pub fn flush(this: *@This()) JSC.Node.Maybe(void) { log("flush()", .{}); + this.unregisterAutoFlusher(); + if (!this.hasBackpressure() or this.done) { return .{ .result = {} }; } if (this.res.hasResponded()) { - this.done = true; + this.markDone(); this.signal.close(null); } @@ -2596,6 +2650,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { _ = this.buffer.write(this.allocator, bytes) catch { return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; }; + this.registerAutoFlusher(); } else if (this.buffer.len + len >= this.highWaterMark) { // TODO: attempt to write both in a corked buffer? _ = this.buffer.write(this.allocator, bytes) catch { @@ -2613,9 +2668,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { _ = this.buffer.write(this.allocator, bytes) catch { return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; }; + this.registerAutoFlusher(); return .{ .owned = len }; } + this.registerAutoFlusher(); this.res.onWritable(*@This(), onWritable, this); return .{ .owned = len }; @@ -2628,7 +2685,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (this.res.hasResponded()) { this.signal.close(null); - this.done = true; + this.markDone(); return .{ .done = {} }; } @@ -2676,9 +2733,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { _ = this.buffer.writeLatin1(this.allocator, bytes) catch { return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; }; + this.registerAutoFlusher(); return .{ .owned = len }; } + this.registerAutoFlusher(); this.res.onWritable(*@This(), onWritable, this); return .{ .owned = len }; @@ -2690,7 +2749,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (this.res.hasResponded()) { this.signal.close(null); - this.done = true; + this.markDone(); return .{ .done = {} }; } @@ -2715,9 +2774,15 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.res.onWritable(*@This(), onWritable, this); } + this.registerAutoFlusher(); return .{ .owned = @as(Blob.SizeType, @intCast(written)) }; } + pub fn markDone(this: *@This()) void { + this.done = true; + this.unregisterAutoFlusher(); + } + // In this case, it's always an error pub fn end(this: *@This(), err: ?Syscall.Error) JSC.Node.Maybe(void) { log("end({any})", .{err}); @@ -2728,7 +2793,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (this.done or this.res.hasResponded()) { this.signal.close(err); - this.done = true; + this.markDone(); this.finalize(); return .{ .result = {} }; } @@ -2739,7 +2804,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (readable.len == 0) { this.signal.close(err); - this.done = true; + this.markDone(); // we do not close the stream here // this.res.endStream(false); this.finalize(); @@ -2759,7 +2824,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (this.done or this.res.hasResponded()) { this.requested_end = true; this.signal.close(null); - this.done = true; + this.markDone(); this.finalize(); return .{ .result = JSC.JSValue.jsNumber(0) }; } @@ -2780,10 +2845,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.res.end("", false); } - this.done = true; + this.markDone(); this.flushPromise(); this.signal.close(null); - this.done = true; this.finalize(); return .{ .result = JSC.JSValue.jsNumber(this.wrote) }; @@ -2796,12 +2860,50 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { pub fn abort(this: *@This()) void { log("onAborted()", .{}); this.done = true; + this.unregisterAutoFlusher(); + this.aborted = true; this.signal.close(null); + this.flushPromise(); this.finalize(); } + fn unregisterAutoFlusher(this: *@This()) void { + if (this.auto_flusher.registered) + AutoFlusher.unregisterDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM()); + } + + fn registerAutoFlusher(this: *@This()) void { + if (!this.auto_flusher.registered) + AutoFlusher.registerDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM()); + } + + pub fn onAutoFlush(this: *@This()) bool { + log("onAutoFlush()", .{}); + if (this.done) { + this.auto_flusher.registered = false; + return false; + } + + const readable = this.readableSlice(); + + if (this.hasBackpressure() or readable.len == 0) { + this.auto_flusher.registered = false; + return false; + } + + if (!this.sendWithoutAutoFlusher(readable)) { + this.auto_flusher.registered = true; + this.res.onWritable(*@This(), onWritable, this); + return true; + } + + this.handleWrote(readable.len); + this.auto_flusher.registered = false; + return false; + } + pub fn destroy(this: *@This()) void { log("destroy()", .{}); var bytes = this.buffer.listManaged(this.allocator); @@ -2810,6 +2912,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { bytes.deinit(); } + this.unregisterAutoFlusher(); + this.allocator.destroy(this); } @@ -2820,6 +2924,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (!this.done) { this.done = true; + this.unregisterAutoFlusher(); this.res.endStream(false); } |