diff options
author | 2023-08-23 14:05:05 -0700 | |
---|---|---|
committer | 2023-08-23 14:05:05 -0700 | |
commit | c60385716b7a7ac9f788cdf7dfe37250321e0670 (patch) | |
tree | b08cc97e7e9d456efac7ec83d4862c8a8e3043bf /src | |
parent | f3266ff436e0ed2aedd0d81f47a1ef104191a2c9 (diff) | |
download | bun-c60385716b7a7ac9f788cdf7dfe37250321e0670.tar.gz bun-c60385716b7a7ac9f788cdf7dfe37250321e0670.tar.zst bun-c60385716b7a7ac9f788cdf7dfe37250321e0670.zip |
Bunch of streams fixes (#4251)
* Update WebKit
* Don't do async hooks things when async hooks are not enabled
* Smarter scheduling of event loop tasks with the http server
* less exciting approach
* Bump WebKit
* Another approach
* Fix body-stream tests
* Fixes #1886
* Fix UAF in fetch body streaming
* Missing from commit
* Fix leak
* Fix the other leak
* Fix test
* Fix crash
* missing duperef
* Make this code clearer
* Ignore empty chunks
* Fixes #3969
* Delete flaky test
* Update bun-linux-build.yml
* Fix memory issue
* fix result body, and .done status before the last callback, dont touch headers after sent once
* refactor HTTPClientResult
* less flasky corrupted test
* oops
* fix mutex invalid state
* fix onProgressUpdate deinit/unlock
* fix onProgressUpdate deinit/unlock
* oops
* remove verbose
* fix posible null use
* avoid http null
* metadata can still be used onReject after toResponse
* dont leak task.http
* fix flask tests
* less flask close tests
---------
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Co-authored-by: cirospaciari <ciro.spaciari@gmail.com>
Diffstat (limited to 'src')
m--------- | src/bun.js/WebKit | 0 | ||||
-rw-r--r-- | src/bun.js/api/server.zig | 295 | ||||
-rw-r--r-- | src/bun.js/bindings/AsyncContextFrame.cpp | 8 | ||||
-rw-r--r-- | src/bun.js/bindings/ZigGlobalObject.cpp | 14 | ||||
-rw-r--r-- | src/bun.js/event_loop.zig | 49 | ||||
-rw-r--r-- | src/bun.js/javascript.zig | 4 | ||||
-rw-r--r-- | src/bun.js/webcore/body.zig | 19 | ||||
-rw-r--r-- | src/bun.js/webcore/request.zig | 20 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 108 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 125 | ||||
-rw-r--r-- | src/cli/test_command.zig | 4 | ||||
-rw-r--r-- | src/http_client_async.zig | 229 | ||||
-rw-r--r-- | src/install/install.zig | 4 | ||||
-rw-r--r-- | src/js/builtins/ReadableStreamDefaultReader.ts | 14 | ||||
-rw-r--r-- | src/js/node/async_hooks.ts | 7 | ||||
-rw-r--r-- | src/js/out/InternalModuleRegistryConstants.h | 6 |
16 files changed, 637 insertions, 269 deletions
diff --git a/src/bun.js/WebKit b/src/bun.js/WebKit -Subproject fd79ce3120a692f4aed314c3da3dd452b4aa865 +Subproject 48c1316e907ca597e27e5a7624160dc18a4df8e diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 45c82b9fa..d6a9e1c6e 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1016,7 +1016,7 @@ fn NewFlags(comptime debug_mode: bool) type { is_transfer_encoding: bool = false, /// Used to identify if request can be safely deinitialized - is_waiting_body: bool = false, + is_waiting_for_request_body: bool = false, /// Used in renderMissing in debug mode to show the user an HTML page /// Used to avoid looking at the uws.Request struct after it's been freed is_web_browser_navigation: if (debug_mode) bool else void = if (debug_mode) false else {}, @@ -1080,9 +1080,21 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp /// When the response body is a temporary value response_buf_owned: std.ArrayListUnmanaged(u8) = .{}, + /// Defer finalization until after the request handler task is completed? + defer_deinit_until_callback_completes: ?*bool = null, + // TODO: support builtin compression const can_sendfile = !ssl_enabled; + pub inline fn isAsync(this: *const RequestContext) bool { + return this.defer_deinit_until_callback_completes == null; + } + + fn drainMicrotasks(this: *const RequestContext) void { + if (this.isAsync()) return; + this.server.vm.drainMicrotasks(); + } + pub fn setAbortHandler(this: *RequestContext) void { if (this.flags.has_abort_handler) return; if (this.resp) |resp| { @@ -1320,8 +1332,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp pub fn end(this: *RequestContext, data: []const u8, closeConnection: bool) void { if (this.resp) |resp| { - if (this.flags.is_waiting_body) { - this.flags.is_waiting_body = false; + if (this.flags.is_waiting_for_request_body) { + this.flags.is_waiting_for_request_body = false; resp.clearOnData(); } resp.end(data, closeConnection); @@ -1331,8 +1343,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp pub fn endStream(this: *RequestContext, closeConnection: bool) void { if (this.resp) |resp| { - if (this.flags.is_waiting_body) { - this.flags.is_waiting_body = false; + if (this.flags.is_waiting_for_request_body) { + this.flags.is_waiting_for_request_body = false; resp.clearOnData(); } resp.endStream(closeConnection); @@ -1342,8 +1354,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp pub fn endWithoutBody(this: *RequestContext, closeConnection: bool) void { if (this.resp) |resp| { - if (this.flags.is_waiting_body) { - this.flags.is_waiting_body = false; + if (this.flags.is_waiting_for_request_body) { + this.flags.is_waiting_for_request_body = false; resp.clearOnData(); } resp.endWithoutBody(closeConnection); @@ -1562,9 +1574,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // if we are waiting for the body yet and the request was not aborted we can safely clear the onData callback if (this.resp) |resp| { - if (this.flags.is_waiting_body and this.flags.aborted == false) { + if (this.flags.is_waiting_for_request_body and this.flags.aborted == false) { resp.clearOnData(); - this.flags.is_waiting_body = false; + this.flags.is_waiting_for_request_body = false; } } } @@ -1576,6 +1588,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } pub fn deinit(this: *RequestContext) void { + if (this.defer_deinit_until_callback_completes) |defer_deinit| { + defer_deinit.* = true; + ctxLog("deferred deinit <d> ({*})<r>", .{this}); + return; + } + ctxLog("deinit<d> ({*})<r>", .{this}); if (comptime Environment.allow_assert) std.debug.assert(this.flags.finalized); @@ -1953,6 +1971,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp const StreamPair = struct { this: *RequestContext, stream: JSC.WebCore.ReadableStream }; + fn handleFirstStreamWrite(this: *@This()) void { + if (!this.flags.has_written_status) { + this.renderMetadata(); + } + } + fn doRenderStream(pair: *StreamPair) void { var this = pair.this; var stream = pair.stream; @@ -1963,20 +1987,18 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } const resp = this.resp.?; - // uWS automatically adds the status line if needed - // we want to batch network calls as much as possible - if (!(this.response_ptr.?.statusCode() == 200 and this.response_ptr.?.body.init.headers == null)) { - this.renderMetadata(); - } - stream.value.ensureStillAlive(); var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable; + var globalThis = this.server.globalThis; response_stream.* = ResponseStream.JSSink{ .sink = .{ .res = resp, .allocator = this.allocator, .buffer = bun.ByteList{}, + .onFirstWrite = @ptrCast(&handleFirstStreamWrite), + .ctx = this, + .globalThis = globalThis, }, }; var signal = &response_stream.sink.signal; @@ -1991,13 +2013,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // We are already corked! const assignment_result: JSValue = ResponseStream.JSSink.assignToStream( - this.server.globalThis, + globalThis, stream.value, response_stream, @as(**anyopaque, @ptrCast(&signal.ptr)), ); assignment_result.ensureStillAlive(); + // assert that it was updated std.debug.assert(!signal.isDead()); @@ -2015,32 +2038,18 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp response_stream.detach(); this.sink = null; response_stream.sink.destroy(); - stream.value.unprotect(); return this.handleReject(err_value); } - if (response_stream.sink.done or - // TODO: is there a condition where resp could be freed before done? - resp.hasResponded()) - { + if (resp.hasResponded()) { if (!this.flags.aborted) resp.clearAborted(); - const wrote_anything = response_stream.sink.wrote > 0; - streamLog("is done", .{}); - const responded = resp.hasResponded(); - + streamLog("done", .{}); response_stream.detach(); this.sink = null; response_stream.sink.destroy(); - if (!responded and !wrote_anything and !this.flags.aborted) { - this.renderMissing(); - return; - } else if (wrote_anything and !responded and !this.flags.aborted) { - this.endStream(this.shouldCloseConnection()); - } - + this.endStream(this.shouldCloseConnection()); this.finalize(); stream.value.unprotect(); - return; } @@ -2049,19 +2058,28 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // it returns a Promise when it goes through ReadableStreamDefaultReader if (assignment_result.asAnyPromise()) |promise| { streamLog("returned a promise", .{}); - switch (promise.status(this.server.globalThis.vm())) { + this.drainMicrotasks(); + + switch (promise.status(globalThis.vm())) { .Pending => { streamLog("promise still Pending", .{}); + if (!this.flags.has_written_status) { + response_stream.sink.onFirstWrite = null; + response_stream.sink.ctx = null; + this.renderMetadata(); + } + // TODO: should this timeout? this.setAbortHandler(); + this.pending_promises_for_abort += 1; this.response_ptr.?.body.value = .{ .Locked = .{ .readable = stream, - .global = this.server.globalThis, + .global = globalThis, }, }; assignment_result.then( - this.server.globalThis, + globalThis, this, onResolveStream, onRejectStream, @@ -2071,11 +2089,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp }, .Fulfilled => { streamLog("promise Fulfilled", .{}); + defer stream.value.unprotect(); + this.handleResolveStream(); }, .Rejected => { streamLog("promise Rejected", .{}); - this.handleRejectStream(this.server.globalThis, promise.result(this.server.globalThis.vm())); + defer stream.value.unprotect(); + + this.handleRejectStream(globalThis, promise.result(globalThis.vm())); }, } return; @@ -2084,22 +2106,23 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (this.flags.aborted) { response_stream.detach(); - stream.cancel(this.server.globalThis); - response_stream.sink.done = true; + stream.cancel(globalThis); + defer stream.value.unprotect(); + response_stream.sink.markDone(); this.finalizeForAbort(); response_stream.sink.finalize(); - stream.value.unprotect(); return; } stream.value.ensureStillAlive(); + defer stream.value.unprotect(); const is_in_progress = response_stream.sink.has_backpressure or !(response_stream.sink.wrote == 0 and response_stream.sink.buffer.len == 0); - if (!stream.isLocked(this.server.globalThis) and !is_in_progress) { - if (JSC.WebCore.ReadableStream.fromJS(stream.value, this.server.globalThis)) |comparator| { + if (!stream.isLocked(globalThis) and !is_in_progress) { + if (JSC.WebCore.ReadableStream.fromJS(stream.value, globalThis)) |comparator| { if (std.meta.activeTag(comparator.ptr) == std.meta.activeTag(stream.ptr)) { streamLog("is not locked", .{}); this.renderMissing(); @@ -2111,7 +2134,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.setAbortHandler(); streamLog("is in progress, but did not return a Promise. Finalizing request context", .{}); this.finalize(); - stream.value.unprotect(); } const streamLog = Output.scoped(.ReadableStream, false); @@ -2120,6 +2142,46 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp return @intFromPtr(this.upgrade_context) == std.math.maxInt(usize); } + fn toAsyncWithoutAbortHandler(ctx: *RequestContext, req: *uws.Request, request_object: *Request) void { + request_object.uws_request = req; + + request_object.ensureURL() catch { + request_object.url = bun.String.empty; + }; + + // we have to clone the request headers here since they will soon belong to a different request + if (request_object.headers == null) { + request_object.headers = JSC.FetchHeaders.createFromUWS(ctx.server.globalThis, req); + } + + // This object dies after the stack frame is popped + // so we have to clear it in here too + request_object.uws_request = null; + } + + fn toAsync( + ctx: *RequestContext, + req: *uws.Request, + request_object: *Request, + ) void { + ctxLog("toAsync", .{}); + ctx.toAsyncWithoutAbortHandler(req, request_object); + if (comptime debug_mode) { + ctx.pathname = request_object.url.clone(); + } + ctx.setAbortHandler(); + } + + // Each HTTP request or TCP socket connection is effectively a "task". + // + // However, unlike the regular task queue, we don't drain the microtask + // queue at the end. + // + // Instead, we drain it multiple times, at the points that would + // otherwise "halt" the Response from being rendered. + // + // - If you return a Promise, we drain the microtask queue once + // - If you return a streaming Response, we drain the microtask queue (possibly the 2nd time this task!) pub fn onResponse( ctx: *RequestContext, this: *ThisServer, @@ -2128,8 +2190,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp request_value: JSValue, response_value: JSValue, ) void { + _ = request_object; + _ = req; request_value.ensureStillAlive(); response_value.ensureStillAlive(); + ctx.drainMicrotasks(); if (ctx.flags.aborted) { ctx.finalizeForAbort(); @@ -2159,6 +2224,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp ctx.response_jsvalue = response_value; ctx.response_jsvalue.ensureStillAlive(); ctx.flags.response_protected = false; + response.body.value.toBlobIfPossible(); switch (response.body.value) { @@ -2210,6 +2276,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp ctx.response_jsvalue.ensureStillAlive(); ctx.flags.response_protected = false; ctx.response_ptr = response; + response.body.value.toBlobIfPossible(); switch (response.body.value) { .Blob => |*blob| { @@ -2236,35 +2303,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } if (wait_for_promise) { - request_object.uws_request = req; - - request_object.ensureURL() catch { - request_object.url = bun.String.empty; - }; - - // we have to clone the request headers here since they will soon belong to a different request - if (request_object.headers == null) { - request_object.headers = JSC.FetchHeaders.createFromUWS(this.globalThis, req); - } - - if (comptime debug_mode) { - ctx.pathname = request_object.url.clone(); - } - - // This object dies after the stack frame is popped - // so we have to clear it in here too - request_object.uws_request = null; - - ctx.setAbortHandler(); ctx.pending_promises_for_abort += 1; - response_value.then(this.globalThis, ctx, RequestContext.onResolve, RequestContext.onReject); return; } - if (ctx.resp) |resp| { - // The user returned something that wasn't a promise or a promise with a response - if (!resp.hasResponded() and !ctx.flags.has_marked_pending) ctx.renderMissing(); - } } pub fn handleResolveStream(req: *RequestContext) void { @@ -2276,6 +2318,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp wrapper.sink.done = true; req.flags.aborted = req.flags.aborted or wrapper.sink.aborted; wrote_anything = wrapper.sink.wrote > 0; + wrapper.sink.finalize(); wrapper.detach(); req.sink = null; @@ -2300,12 +2343,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp const responded = resp.hasResponded(); - if (!responded and !wrote_anything) { - resp.clearAborted(); - req.renderMissing(); - return; - } else if (!responded and wrote_anything) { + if (!responded) { resp.clearAborted(); + if (!req.flags.has_written_status) { + req.renderMetadata(); + } req.endStream(req.shouldCloseConnection()); } @@ -2316,6 +2358,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp streamLog("onResolveStream", .{}); var args = callframe.arguments(2); var req: *@This() = args.ptr[args.len - 1].asPromisePtr(@This()); + req.pending_promises_for_abort -|= 1; req.handleResolveStream(); return JSValue.jsUndefined(); } @@ -2323,19 +2366,19 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp streamLog("onRejectStream", .{}); const args = callframe.arguments(2); var req = args.ptr[args.len - 1].asPromisePtr(@This()); + req.pending_promises_for_abort -|= 1; var err = args.ptr[0]; req.handleRejectStream(globalThis, err); return JSValue.jsUndefined(); } pub fn handleRejectStream(req: *@This(), globalThis: *JSC.JSGlobalObject, err: JSValue) void { + _ = globalThis; streamLog("handleRejectStream", .{}); - var wrote_anything = req.flags.has_written_status; if (req.sink) |wrapper| { wrapper.sink.pending_flush = null; wrapper.sink.done = true; - wrote_anything = wrote_anything or wrapper.sink.wrote > 0; req.flags.aborted = req.flags.aborted or wrapper.sink.aborted; wrapper.sink.finalize(); wrapper.detach(); @@ -2350,40 +2393,32 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } } - streamLog("onReject({any})", .{wrote_anything}); - - //aborted so call finalizeForAbort + // aborted so call finalizeForAbort if (req.flags.aborted) { req.finalizeForAbort(); return; } - if (!err.isEmptyOrUndefinedOrNull() and !wrote_anything) { - req.response_jsvalue.unprotect(); - req.response_jsvalue = JSValue.zero; - req.handleReject(err); - return; - } else if (wrote_anything) { - req.endStream(true); - if (comptime debug_mode) { - if (!err.isEmptyOrUndefinedOrNull()) { - var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator); - defer exception_list.deinit(); - req.server.vm.runErrorHandler(err, &exception_list); - } - } - req.finalize(); - return; + streamLog("onReject()", .{}); + + if (!req.flags.has_written_status) { + req.renderMetadata(); } - const fallback = JSC.SystemError{ - .code = bun.String.static(@as(string, @tagName(JSC.Node.ErrorCode.ERR_UNHANDLED_ERROR))), - .message = bun.String.static("Unhandled error in ReadableStream"), - }; - req.handleReject(fallback.toErrorInstance(globalThis)); + req.endStream(true); + if (comptime debug_mode) { + if (!err.isEmptyOrUndefinedOrNull()) { + var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator); + defer exception_list.deinit(); + req.server.vm.runErrorHandler(err, &exception_list); + } + } + req.finalize(); } pub fn doRenderWithBody(this: *RequestContext, value: *JSC.WebCore.Body.Value) void { + this.drainMicrotasks(); + // If a ReadableStream can trivially be converted to a Blob, do so. // If it's a WTFStringImpl and it cannot be used as a UTF-8 string, convert it to a Blob. value.toBlobIfPossible(); @@ -2833,8 +2868,13 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp std.debug.assert(this.resp == resp); - this.flags.is_waiting_body = last == false; + this.flags.is_waiting_for_request_body = last == false; if (this.flags.aborted or this.flags.has_marked_complete) return; + if (!last and chunk.len == 0) { + // Sometimes, we get back an empty chunk + // We have to ignore those chunks unless it's the last one + return; + } if (this.request_body != null) { var body = this.request_body.?; @@ -2900,6 +2940,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } if (old == .Locked) { + defer this.drainMicrotasks(); + old.resolve(&body.value, this.server.globalThis); } return; @@ -3254,7 +3296,7 @@ pub const WebSocketServer = struct { globalObject.throwInvalidArguments("websocket expects maxPayloadLength to be an integer", .{}); return null; } - server.maxPayloadLength = @as(u32, @intCast(@max(value.toInt64(), 0))); + server.maxPayloadLength = @truncate(@max(value.toInt64(), 0)); } } @@ -3265,7 +3307,7 @@ pub const WebSocketServer = struct { return null; } - var idleTimeout = @as(u16, @intCast(@as(u32, @truncate(@max(value.toInt64(), 0))))); + var idleTimeout: u16 = @truncate(@max(value.toInt64(), 0)); if (idleTimeout > 960) { globalObject.throwInvalidArguments("websocket expects idleTimeout to be 960 or less", .{}); return null; @@ -3285,7 +3327,7 @@ pub const WebSocketServer = struct { return null; } - server.backpressureLimit = @as(u32, @intCast(@max(value.toInt64(), 0))); + server.backpressureLimit = @truncate(@max(value.toInt64(), 0)); } } @@ -5271,6 +5313,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp ) void { JSC.markBinding(@src()); this.pending_requests += 1; + req.setYield(false); var ctx = this.request_pool_allocator.tryGet() catch @panic("ran out of memory"); ctx.create(this, req, resp); @@ -5337,7 +5380,8 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp .onStartStreaming = RequestContext.onStartStreamingRequestBodyCallback, }, }; - ctx.flags.is_waiting_body = true; + ctx.flags.is_waiting_for_request_body = true; + resp.onData(*RequestContext, RequestContext.onBufferedBodyChunk, ctx); } } @@ -5352,7 +5396,13 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp request_value.ensureStillAlive(); const response_value = this.config.onRequest.callWithThis(this.globalThis, this.thisObject, &args); + defer { + // uWS request will not live longer than this function + request_object.uws_request = null; + } + var should_deinit_context = false; + ctx.defer_deinit_until_callback_completes = &should_deinit_context; ctx.onResponse( this, req, @@ -5360,8 +5410,20 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp request_value, response_value, ); - // uWS request will not live longer than this function - request_object.uws_request = null; + ctx.defer_deinit_until_callback_completes = null; + + if (should_deinit_context) { + request_object.uws_request = null; + ctx.deinit(); + return; + } + + if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body) { + ctx.renderMissing(); + return; + } + + ctx.toAsync(req, request_object); } pub fn onWebSocketUpgrade( @@ -5404,7 +5466,13 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp const request_value = args[0]; request_value.ensureStillAlive(); const response_value = this.config.onRequest.callWithThis(this.globalThis, this.thisObject, &args); + defer { + // uWS request will not live longer than this function + request_object.uws_request = null; + } + var should_deinit_context = false; + ctx.defer_deinit_until_callback_completes = &should_deinit_context; ctx.onResponse( this, req, @@ -5412,9 +5480,20 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp request_value, response_value, ); + ctx.defer_deinit_until_callback_completes = null; - // uWS request will not live longer than this function - request_object.uws_request = null; + if (should_deinit_context) { + request_object.uws_request = null; + ctx.deinit(); + return; + } + + if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body) { + ctx.renderMissing(); + return; + } + + ctx.toAsync(req, request_object); } pub fn listen(this: *ThisServer) void { diff --git a/src/bun.js/bindings/AsyncContextFrame.cpp b/src/bun.js/bindings/AsyncContextFrame.cpp index 2a103a8d1..1c541b2a8 100644 --- a/src/bun.js/bindings/AsyncContextFrame.cpp +++ b/src/bun.js/bindings/AsyncContextFrame.cpp @@ -97,10 +97,18 @@ extern "C" EncodedJSValue AsyncContextFrame__withAsyncContextIfNeeded(JSGlobalOb // } JSValue AsyncContextFrame::call(JSGlobalObject* global, JSValue functionObject, JSValue thisValue, const ArgList& args) { + if (LIKELY(!global->isAsyncContextTrackingEnabled())) { + return JSC::profiledCall(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args); + } + ASYNCCONTEXTFRAME_CALL_IMPL(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args); } JSValue AsyncContextFrame::call(JSGlobalObject* global, JSValue functionObject, JSValue thisValue, const ArgList& args, NakedPtr<Exception>& returnedException) { + if (LIKELY(!global->isAsyncContextTrackingEnabled())) { + return JSC::profiledCall(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args, returnedException); + } + ASYNCCONTEXTFRAME_CALL_IMPL(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args, returnedException); } diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp index 0ecafeae4..d3bd623dd 100644 --- a/src/bun.js/bindings/ZigGlobalObject.cpp +++ b/src/bun.js/bindings/ZigGlobalObject.cpp @@ -1411,6 +1411,16 @@ JSC_DEFINE_HOST_FUNCTION(asyncHooksCleanupLater, (JSC::JSGlobalObject * globalOb return JSC::JSValue::encode(JSC::jsUndefined()); } +JSC_DEFINE_HOST_FUNCTION(asyncHooksSetEnabled, (JSC::JSGlobalObject * globalObject, JSC::CallFrame* callFrame)) +{ + // assumptions and notes: + // - nobody else uses setOnEachMicrotaskTick + // - this is called by js if we set async context in a way we may not clear it + // - AsyncLocalStorage.prototype.run cleans up after itself and does not call this cb + globalObject->setAsyncContextTrackingEnabled(callFrame->argument(0).toBoolean(globalObject)); + return JSC::JSValue::encode(JSC::jsUndefined()); +} + extern "C" int Bun__ttySetMode(int fd, int mode); JSC_DEFINE_HOST_FUNCTION(jsTTYSetMode, (JSC::JSGlobalObject * globalObject, CallFrame* callFrame)) @@ -1689,6 +1699,10 @@ static JSC_DEFINE_HOST_FUNCTION(functionLazyLoad, if (string == "async_hooks"_s) { auto* obj = constructEmptyObject(globalObject); obj->putDirect( + vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "setAsyncHooksEnabled"_s)), + JSC::JSFunction::create(vm, globalObject, 0, "setAsyncHooksEnabled"_s, asyncHooksSetEnabled, ImplementationVisibility::Public), 0); + + obj->putDirect( vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "cleanupLater"_s)), JSC::JSFunction::create(vm, globalObject, 0, "cleanupLater"_s, asyncHooksCleanupLater, ImplementationVisibility::Public), 0); return JSValue::encode(obj); diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 92874b6a4..896297060 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -509,6 +509,7 @@ comptime { } } +pub const DeferredRepeatingTask = *const (fn (*anyopaque) bool); pub const EventLoop = struct { tasks: Queue = undefined, concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{}, @@ -518,6 +519,7 @@ pub const EventLoop = struct { start_server_on_next_tick: bool = false, defer_count: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0), forever_timer: ?*uws.Timer = null, + deferred_microtask_map: std.AutoArrayHashMapUnmanaged(?*anyopaque, DeferredRepeatingTask) = .{}, pub const Queue = std.fifo.LinearFifo(Task, .Dynamic); const log = bun.Output.scoped(.EventLoop, false); @@ -528,6 +530,49 @@ pub const EventLoop = struct { } } + pub fn drainMicrotasksWithVM(this: *EventLoop, vm: *JSC.VM) void { + vm.drainMicrotasks(); + this.drainDeferredTasks(); + } + + pub fn drainMicrotasks(this: *EventLoop) void { + this.drainMicrotasksWithVM(this.global.vm()); + } + + pub fn ensureAliveForOneTick(this: *EventLoop) void { + if (this.noop_task.scheduled) return; + this.enqueueTask(Task.init(&this.noop_task)); + this.noop_task.scheduled = true; + } + + pub fn registerDeferredTask(this: *EventLoop, ctx: ?*anyopaque, task: DeferredRepeatingTask) bool { + const existing = this.deferred_microtask_map.getOrPutValue(this.virtual_machine.allocator, ctx, task) catch unreachable; + return existing.found_existing; + } + + pub fn unregisterDeferredTask(this: *EventLoop, ctx: ?*anyopaque) bool { + return this.deferred_microtask_map.swapRemove(ctx); + } + + fn drainDeferredTasks(this: *EventLoop) void { + var i: usize = 0; + var last = this.deferred_microtask_map.count(); + while (i < last) { + var key = this.deferred_microtask_map.keys()[i] orelse { + this.deferred_microtask_map.swapRemoveAt(i); + last = this.deferred_microtask_map.count(); + continue; + }; + + if (!this.deferred_microtask_map.values()[i](key)) { + this.deferred_microtask_map.swapRemoveAt(i); + last = this.deferred_microtask_map.count(); + } else { + i += 1; + } + } + } + pub fn tickWithCount(this: *EventLoop) u32 { var global = this.global; var global_vm = global.vm(); @@ -621,7 +666,7 @@ pub const EventLoop = struct { } global_vm.releaseWeakRefs(); - global_vm.drainMicrotasks(); + this.drainMicrotasksWithVM(global_vm); } this.tasks.head = if (this.tasks.count == 0) 0 else this.tasks.head; @@ -758,7 +803,7 @@ pub const EventLoop = struct { this.tickConcurrent(); } else { global_vm.releaseWeakRefs(); - global_vm.drainMicrotasks(); + this.drainMicrotasksWithVM(global_vm); this.tickConcurrent(); if (this.tasks.count > 0) continue; } diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index a016129e2..492b9fbee 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -1742,6 +1742,10 @@ pub const VirtualMachine = struct { ret.success = true; } + pub fn drainMicrotasks(this: *VirtualMachine) void { + this.eventLoop().drainMicrotasks(); + } + pub fn processFetchLog(globalThis: *JSGlobalObject, specifier: bun.String, referrer: bun.String, log: *logger.Log, ret: *ErrorableResolvedSource, err: anyerror) void { switch (log.msgs.items.len) { 0 => { diff --git a/src/bun.js/webcore/body.zig b/src/bun.js/webcore/body.zig index fa0ec9b24..86462dd04 100644 --- a/src/bun.js/webcore/body.zig +++ b/src/bun.js/webcore/body.zig @@ -249,10 +249,7 @@ pub const Body = struct { pub fn setPromise(value: *PendingValue, globalThis: *JSC.JSGlobalObject, action: Action) JSValue { value.action = action; - if (value.readable) |readable| { - // switch (readable.ptr) { - // .JavaScript - // } + if (value.readable) |readable| handle_stream: { switch (action) { .getFormData, .getText, .getJSON, .getBlob, .getArrayBuffer => { value.promise = switch (action) { @@ -261,6 +258,20 @@ pub const Body = struct { .getText => globalThis.readableStreamToText(readable.value), .getBlob => globalThis.readableStreamToBlob(readable.value), .getFormData => |form_data| brk: { + if (value.onStartBuffering != null) { + if (readable.isDisturbed(globalThis)) { + form_data.?.deinit(); + readable.value.unprotect(); + value.readable = null; + value.action = .{ .none = {} }; + return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.createErrorInstance("ReadableStream is already used", .{})); + } else { + readable.detach(globalThis); + value.readable = null; + } + + break :handle_stream; + } defer { form_data.?.deinit(); value.action.getFormData = null; diff --git a/src/bun.js/webcore/request.zig b/src/bun.js/webcore/request.zig index c01e72d60..aaa3f6b79 100644 --- a/src/bun.js/webcore/request.zig +++ b/src/bun.js/webcore/request.zig @@ -485,7 +485,8 @@ pub const Request = struct { _ = req.body.unref(); return null; }; - req.url = str; + req.url = str.dupeRef(); + if (!req.url.isEmpty()) fields.insert(.url); } else if (!url_or_object_type.isObject()) { @@ -554,7 +555,7 @@ pub const Request = struct { if (!fields.contains(.url)) { if (!response.url.isEmpty()) { - req.url = response.url; + req.url = response.url.dupeRef(); fields.insert(.url); } } @@ -586,7 +587,7 @@ pub const Request = struct { if (!fields.contains(.url)) { if (value.fastGet(globalThis, .url)) |url| { - req.url = bun.String.fromJS(url, globalThis); + req.url = bun.String.fromJS(url, globalThis).dupeRef(); if (!req.url.isEmpty()) fields.insert(.url); @@ -599,7 +600,7 @@ pub const Request = struct { _ = req.body.unref(); return null; }; - req.url = str; + req.url = str.dupeRef(); if (!req.url.isEmpty()) fields.insert(.url); } @@ -648,9 +649,10 @@ pub const Request = struct { return null; } - // Note that the string is going to be ref'd here, so we don't need to ref it above. const href = JSC.URL.hrefFromString(req.url); if (href.isEmpty()) { + // globalThis.throw can cause GC, which could cause the above string to be freed. + // so we must increment the reference count before calling it. globalThis.throw("Failed to construct 'Request': Invalid URL \"{}\"", .{ req.url, }); @@ -658,6 +660,14 @@ pub const Request = struct { _ = req.body.unref(); return null; } + + // hrefFromString increments the reference count if they end up being + // the same + // + // we increment the reference count on usage above, so we must + // decrement it to be perfectly balanced. + req.url.deref(); + req.url = href; if (req.body.value == .Blob and diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 01ecfad36..d947a7d4e 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -617,6 +617,7 @@ pub const Fetch = struct { http: ?*HTTPClient.AsyncHTTP = null, result: HTTPClient.HTTPClientResult = .{}, + metadata: ?HTTPClient.HTTPClientResult.ResultMetadata = .{}, javascript_vm: *VirtualMachine = undefined, global_this: *JSGlobalObject = undefined, request_body: HTTPRequestBody = undefined, @@ -641,7 +642,8 @@ pub const Fetch = struct { url_proxy_buffer: []const u8 = "", signal: ?*JSC.WebCore.AbortSignal = null, - aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + signals: HTTPClient.Signals = .{}, + signal_store: HTTPClient.Signals.Store = .{}, has_schedule_callback: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), // must be stored because AbortSignal stores reason weakly @@ -702,11 +704,19 @@ pub const Fetch = struct { this.request_headers.entries.deinit(bun.default_allocator); this.request_headers.buf.deinit(bun.default_allocator); this.request_headers = Headers{ .allocator = undefined }; - this.http.?.clearData(); - this.result.deinitMetadata(); + if (this.http != null) { + this.http.?.clearData(); + } + + if (this.metadata != null) { + this.metadata.?.deinit(); + this.metadata = null; + } + this.response_buffer.deinit(); this.response.deinit(); + this.scheduled_response_buffer.deinit(); this.request_body.detach(); @@ -725,9 +735,13 @@ pub const Fetch = struct { } pub fn onBodyReceived(this: *FetchTasklet) void { + this.mutex.lock(); const success = this.result.isSuccess(); const globalThis = this.global_this; defer { + this.has_schedule_callback.store(false, .Monotonic); + this.mutex.unlock(); + if (!success or !this.result.has_more) { var vm = globalThis.bunVM(); this.poll_ref.unref(vm); @@ -831,43 +845,42 @@ pub const Fetch = struct { pub fn onProgressUpdate(this: *FetchTasklet) void { JSC.markBinding(@src()); - this.mutex.lock(); - defer { - this.has_schedule_callback.store(false, .Monotonic); - this.mutex.unlock(); - } - if (this.is_waiting_body) { return this.onBodyReceived(); } + this.mutex.lock(); const globalThis = this.global_this; var ref = this.promise; const promise_value = ref.value(); - defer ref.strong.deinit(); var poll_ref = this.poll_ref; var vm = globalThis.bunVM(); if (promise_value.isEmptyOrUndefinedOrNull()) { + ref.strong.deinit(); + this.has_schedule_callback.store(false, .Monotonic); + this.mutex.unlock(); poll_ref.unref(vm); this.clearData(); this.deinit(); return; } + const promise = promise_value.asAnyPromise().?; + const tracker = this.tracker; + tracker.willDispatch(globalThis); defer { + tracker.didDispatch(globalThis); + ref.strong.deinit(); + this.has_schedule_callback.store(false, .Monotonic); + this.mutex.unlock(); if (!this.is_waiting_body) { poll_ref.unref(vm); this.clearData(); this.deinit(); } } - - const promise = promise_value.asAnyPromise().?; - const tracker = this.tracker; - tracker.willDispatch(globalThis); - defer tracker.didDispatch(globalThis); const success = this.result.isSuccess(); const result = switch (success) { true => this.onResolve(), @@ -907,6 +920,16 @@ pub const Fetch = struct { return JSC.WebCore.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this); } + var path: bun.String = undefined; + + if (this.metadata) |metadata| { + path = bun.String.create(metadata.href); + } else if (this.http) |http| { + path = bun.String.create(http.url.href); + } else { + path = bun.String.empty; + } + const fetch_error = JSC.SystemError{ .code = bun.String.static(@errorName(this.result.fail)), .message = switch (this.result.fail) { @@ -916,7 +939,7 @@ pub const Fetch = struct { error.ConnectionRefused => bun.String.static("Unable to connect. Is the computer able to access the url?"), else => bun.String.static("fetch() failed. For more information, pass `verbose: true` in the second argument to fetch()"), }, - .path = bun.String.create(this.http.?.url.href), + .path = path, }; return fetch_error.toErrorInstance(this.global_this); @@ -927,7 +950,7 @@ pub const Fetch = struct { if (this.http) |http| { http.enableBodyStreaming(); } - if (this.aborted.load(.Acquire)) { + if (this.signal_store.aborted.load(.Monotonic)) { return JSC.WebCore.DrainResult{ .aborted = {}, }; @@ -1000,21 +1023,27 @@ pub const Fetch = struct { } fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response { - const http_response = this.result.response; - this.is_waiting_body = this.result.has_more; - return Response{ - .allocator = allocator, - .url = bun.String.createAtomIfPossible(this.result.href), - .status_text = bun.String.createAtomIfPossible(http_response.status), - .redirected = this.result.redirected, - .body = .{ - .init = .{ - .headers = FetchHeaders.createFromPicoHeaders(http_response.headers), - .status_code = @as(u16, @truncate(http_response.status_code)), + // at this point we always should have metadata + std.debug.assert(this.metadata != null); + if (this.metadata) |metadata| { + const http_response = metadata.response; + this.is_waiting_body = this.result.has_more; + return Response{ + .allocator = allocator, + .url = bun.String.createAtomIfPossible(metadata.href), + .status_text = bun.String.createAtomIfPossible(http_response.status), + .redirected = this.result.redirected, + .body = .{ + .init = .{ + .headers = FetchHeaders.createFromPicoHeaders(http_response.headers), + .status_code = @as(u16, @truncate(http_response.status_code)), + }, + .value = this.toBodyValue(), }, - .value = this.toBodyValue(), - }, - }; + }; + } + + @panic("fetch metadata should be provided"); } pub fn onResolve(this: *FetchTasklet) JSValue { @@ -1063,6 +1092,7 @@ pub const Fetch = struct { .hostname = fetch_options.hostname, .tracker = JSC.AsyncTaskTracker.init(jsc_vm), }; + fetch_tasklet.signals = fetch_tasklet.signal_store.to(); fetch_tasklet.tracker.didSchedule(globalThis); @@ -1079,6 +1109,10 @@ pub const Fetch = struct { proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url); } + if (fetch_tasklet.signal == null) { + fetch_tasklet.signals.aborted = null; + } + fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init( allocator, fetch_options.method, @@ -1095,9 +1129,10 @@ pub const Fetch = struct { fetch_tasklet, ), proxy, - if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null, + fetch_options.hostname, fetch_options.redirect_type, + fetch_tasklet.signals, ); if (fetch_options.redirect_type != FetchRedirect.follow) { @@ -1108,7 +1143,7 @@ pub const Fetch = struct { fetch_tasklet.http.?.client.verbose = fetch_options.verbose; fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive; // we wanna to return after headers are received - fetch_tasklet.http.?.signalHeaderProgress(); + fetch_tasklet.signal_store.header_progress.store(true, .Monotonic); if (fetch_tasklet.request_body == .Sendfile) { std.debug.assert(fetch_options.url.isHTTP()); @@ -1127,7 +1162,7 @@ pub const Fetch = struct { reason.ensureStillAlive(); this.abort_reason = reason; reason.protect(); - this.aborted.store(true, .Monotonic); + this.signal_store.aborted.store(true, .Monotonic); this.tracker.didCancel(this.global_this); if (this.http != null) { @@ -1180,11 +1215,14 @@ pub const Fetch = struct { task.mutex.lock(); defer task.mutex.unlock(); task.result = result; + // metadata should be provided only once so we preserve it until we consume it + if (result.metadata) |metadata| { + task.metadata = metadata; + } task.body_size = result.body_size; const success = result.isSuccess(); task.response_buffer = result.body.?.*; - if (success) { _ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch @panic("OOM"); } diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 955d10ffb..771d34db0 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -1915,6 +1915,34 @@ pub const ArrayBufferSink = struct { pub const JSSink = NewJSSink(@This(), "ArrayBufferSink"); }; +const AutoFlusher = struct { + registered: bool = false, + + pub fn registerDeferredMicrotaskWithType(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void { + if (this.auto_flusher.registered) return; + this.auto_flusher.registered = true; + std.debug.assert(!vm.eventLoop().registerDeferredTask(this, @ptrCast(&Type.onAutoFlush))); + } + + pub fn unregisterDeferredMicrotaskWithType(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void { + if (!this.auto_flusher.registered) return; + this.auto_flusher.registered = false; + std.debug.assert(vm.eventLoop().unregisterDeferredTask(this)); + } + + pub fn unregisterDeferredMicrotaskWithTypeUnchecked(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void { + std.debug.assert(this.auto_flusher.registered); + std.debug.assert(vm.eventLoop().unregisterDeferredTask(this)); + this.auto_flusher.registered = false; + } + + pub fn registerDeferredMicrotaskWithTypeUnchecked(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void { + std.debug.assert(!this.auto_flusher.registered); + this.auto_flusher.registered = true; + std.debug.assert(!vm.eventLoop().registerDeferredTask(this, @ptrCast(&Type.onAutoFlush))); + } +}; + pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type { return struct { sink: SinkType, @@ -2357,6 +2385,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { end_len: usize = 0, aborted: bool = false, + onFirstWrite: ?*const fn (?*anyopaque) void = null, + ctx: ?*anyopaque = null, + + auto_flusher: AutoFlusher = AutoFlusher{}, + const log = Output.scoped(.HTTPServerWritable, false); pub fn connect(this: *@This(), signal: Signal) void { @@ -2375,15 +2408,25 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } } + fn handleFirstWriteIfNecessary(this: *@This()) void { + if (this.onFirstWrite) |onFirstWrite| { + var ctx = this.ctx; + this.ctx = null; + this.onFirstWrite = null; + onFirstWrite(ctx); + } + } + fn hasBackpressure(this: *const @This()) bool { return this.has_backpressure; } - fn send(this: *@This(), buf: []const u8) bool { + fn sendWithoutAutoFlusher(this: *@This(), buf: []const u8) bool { std.debug.assert(!this.done); defer log("send: {d} bytes (backpressure: {any})", .{ buf.len, this.has_backpressure }); if (this.requested_end and !this.res.state().isHttpWriteCalled()) { + this.handleFirstWriteIfNecessary(); const success = this.res.tryEnd(buf, this.end_len, false); this.has_backpressure = !success; return success; @@ -2395,10 +2438,12 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { // so in this scenario, we just append to the buffer // and report success if (this.requested_end) { + this.handleFirstWriteIfNecessary(); this.res.end(buf, false); this.has_backpressure = false; return true; } else { + this.handleFirstWriteIfNecessary(); this.has_backpressure = !this.res.write(buf); if (this.has_backpressure) { this.res.onWritable(*@This(), onWritable, this); @@ -2409,6 +2454,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { unreachable; } + fn send(this: *@This(), buf: []const u8) bool { + this.unregisterAutoFlusher(); + return this.sendWithoutAutoFlusher(buf); + } + fn readableSlice(this: *@This()) []const u8 { return this.buffer.ptr[this.offset..this.buffer.cap][0..this.buffer.len]; } @@ -2464,7 +2514,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { pub fn start(this: *@This(), stream_start: StreamStart) JSC.Node.Maybe(void) { if (this.aborted or this.res.hasResponded()) { - this.done = true; + this.markDone(); this.signal.close(null); return .{ .result = {} }; } @@ -2529,6 +2579,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { pub fn flushFromJS(this: *@This(), globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) { log("flushFromJS({any})", .{wait}); + this.unregisterAutoFlusher(); + if (!wait) { return this.flushFromJSNoWait(); } @@ -2563,12 +2615,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { pub fn flush(this: *@This()) JSC.Node.Maybe(void) { log("flush()", .{}); + this.unregisterAutoFlusher(); + if (!this.hasBackpressure() or this.done) { return .{ .result = {} }; } if (this.res.hasResponded()) { - this.done = true; + this.markDone(); this.signal.close(null); } @@ -2596,6 +2650,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { _ = this.buffer.write(this.allocator, bytes) catch { return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; }; + this.registerAutoFlusher(); } else if (this.buffer.len + len >= this.highWaterMark) { // TODO: attempt to write both in a corked buffer? _ = this.buffer.write(this.allocator, bytes) catch { @@ -2613,9 +2668,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { _ = this.buffer.write(this.allocator, bytes) catch { return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; }; + this.registerAutoFlusher(); return .{ .owned = len }; } + this.registerAutoFlusher(); this.res.onWritable(*@This(), onWritable, this); return .{ .owned = len }; @@ -2628,7 +2685,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (this.res.hasResponded()) { this.signal.close(null); - this.done = true; + this.markDone(); return .{ .done = {} }; } @@ -2676,9 +2733,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { _ = this.buffer.writeLatin1(this.allocator, bytes) catch { return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; }; + this.registerAutoFlusher(); return .{ .owned = len }; } + this.registerAutoFlusher(); this.res.onWritable(*@This(), onWritable, this); return .{ .owned = len }; @@ -2690,7 +2749,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (this.res.hasResponded()) { this.signal.close(null); - this.done = true; + this.markDone(); return .{ .done = {} }; } @@ -2715,9 +2774,15 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.res.onWritable(*@This(), onWritable, this); } + this.registerAutoFlusher(); return .{ .owned = @as(Blob.SizeType, @intCast(written)) }; } + pub fn markDone(this: *@This()) void { + this.done = true; + this.unregisterAutoFlusher(); + } + // In this case, it's always an error pub fn end(this: *@This(), err: ?Syscall.Error) JSC.Node.Maybe(void) { log("end({any})", .{err}); @@ -2728,7 +2793,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (this.done or this.res.hasResponded()) { this.signal.close(err); - this.done = true; + this.markDone(); this.finalize(); return .{ .result = {} }; } @@ -2739,7 +2804,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (readable.len == 0) { this.signal.close(err); - this.done = true; + this.markDone(); // we do not close the stream here // this.res.endStream(false); this.finalize(); @@ -2759,7 +2824,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (this.done or this.res.hasResponded()) { this.requested_end = true; this.signal.close(null); - this.done = true; + this.markDone(); this.finalize(); return .{ .result = JSC.JSValue.jsNumber(0) }; } @@ -2780,10 +2845,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.res.end("", false); } - this.done = true; + this.markDone(); this.flushPromise(); this.signal.close(null); - this.done = true; this.finalize(); return .{ .result = JSC.JSValue.jsNumber(this.wrote) }; @@ -2796,12 +2860,50 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { pub fn abort(this: *@This()) void { log("onAborted()", .{}); this.done = true; + this.unregisterAutoFlusher(); + this.aborted = true; this.signal.close(null); + this.flushPromise(); this.finalize(); } + fn unregisterAutoFlusher(this: *@This()) void { + if (this.auto_flusher.registered) + AutoFlusher.unregisterDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM()); + } + + fn registerAutoFlusher(this: *@This()) void { + if (!this.auto_flusher.registered) + AutoFlusher.registerDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM()); + } + + pub fn onAutoFlush(this: *@This()) bool { + log("onAutoFlush()", .{}); + if (this.done) { + this.auto_flusher.registered = false; + return false; + } + + const readable = this.readableSlice(); + + if (this.hasBackpressure() or readable.len == 0) { + this.auto_flusher.registered = false; + return false; + } + + if (!this.sendWithoutAutoFlusher(readable)) { + this.auto_flusher.registered = true; + this.res.onWritable(*@This(), onWritable, this); + return true; + } + + this.handleWrote(readable.len); + this.auto_flusher.registered = false; + return false; + } + pub fn destroy(this: *@This()) void { log("destroy()", .{}); var bytes = this.buffer.listManaged(this.allocator); @@ -2810,6 +2912,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { bytes.deinit(); } + this.unregisterAutoFlusher(); + this.allocator.destroy(this); } @@ -2820,6 +2924,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (!this.done) { this.done = true; + this.unregisterAutoFlusher(); this.res.endStream(false); } diff --git a/src/cli/test_command.zig b/src/cli/test_command.zig index 5bf48d7d9..53dd4c3c5 100644 --- a/src/cli/test_command.zig +++ b/src/cli/test_command.zig @@ -955,12 +955,12 @@ pub const TestCommand = struct { } { - vm.global.vm().drainMicrotasks(); + vm.drainMicrotasks(); var count = vm.unhandled_error_counter; vm.global.handleRejectedPromises(); while (vm.unhandled_error_counter > count) { count = vm.unhandled_error_counter; - vm.global.vm().drainMicrotasks(); + vm.drainMicrotasks(); vm.global.handleRejectedPromises(); } } diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 725e960d6..26978db22 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -60,6 +60,31 @@ var shared_response_headers_buf: [256]picohttp.Header = undefined; const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n"; +pub const Signals = struct { + header_progress: ?*std.atomic.Atomic(bool) = null, + body_streaming: ?*std.atomic.Atomic(bool) = null, + aborted: ?*std.atomic.Atomic(bool) = null, + + pub const Store = struct { + header_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + body_streaming: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + + pub fn to(this: *Store) Signals { + return .{ + .header_progress = &this.header_progress, + .body_streaming = &this.body_streaming, + .aborted = &this.aborted, + }; + } + }; + + pub fn get(this: Signals, comptime field: std.meta.FieldEnum(Signals)) bool { + var ptr: *std.atomic.Atomic(bool) = @field(this, @tagName(field)) orelse return false; + return ptr.load(.Monotonic); + } +}; + pub const FetchRedirect = enum(u8) { follow, manual, @@ -761,12 +786,12 @@ pub fn onOpen( std.debug.assert(is_ssl == client.url.isHTTPS()); } } - if (client.aborted != null) { + if (client.signals.aborted != null) { socket_async_http_abort_tracker.put(client.async_http_id, socket.socket) catch unreachable; } log("Connected {s} \n", .{client.url.href}); - if (client.hasSignalAborted()) { + if (client.signals.get(.aborted)) { client.closeAndAbort(comptime is_ssl, socket); return; } @@ -1012,6 +1037,7 @@ pub const InternalState = struct { fail: anyerror = error.NoError, request_stage: HTTPStage = .pending, response_stage: HTTPStage = .pending, + metadata_sent: bool = false, pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState { return .{ @@ -1153,13 +1179,17 @@ pub const InternalState = struct { } pub fn postProcessBody(this: *InternalState) usize { - var response = &this.pending_response; - // if it compressed with this header, it is no longer - if (this.content_encoding_i < response.headers.len) { - var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len }; - _ = mutable_headers.orderedRemove(this.content_encoding_i); - response.headers = mutable_headers.items; - this.content_encoding_i = std.math.maxInt(@TypeOf(this.content_encoding_i)); + + // we only touch it if we did not sent the headers yet + if (!this.metadata_sent) { + var response = &this.pending_response; + if (this.content_encoding_i < response.headers.len) { + // if it compressed with this header, it is no longer + var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len }; + _ = mutable_headers.orderedRemove(this.content_encoding_i); + response.headers = mutable_headers.items; + this.content_encoding_i = std.math.maxInt(@TypeOf(this.content_encoding_i)); + } } return this.body_out_str.?.list.items.len; @@ -1201,11 +1231,9 @@ http_proxy: ?URL = null, proxy_authorization: ?[]u8 = null, proxy_tunneling: bool = false, proxy_tunnel: ?ProxyTunnel = null, -aborted: ?*std.atomic.Atomic(bool) = null, +signals: Signals = .{}, async_http_id: u32 = 0, hostname: ?[]u8 = null, -signal_header_progress: *std.atomic.Atomic(bool), -enable_body_stream: *std.atomic.Atomic(bool), pub fn init( allocator: std.mem.Allocator, @@ -1213,10 +1241,8 @@ pub fn init( url: URL, header_entries: Headers.Entries, header_buf: string, - signal: ?*std.atomic.Atomic(bool), hostname: ?[]u8, - signal_header_progress: *std.atomic.Atomic(bool), - enable_body_stream: *std.atomic.Atomic(bool), + signals: Signals, ) HTTPClient { return HTTPClient{ .allocator = allocator, @@ -1224,10 +1250,8 @@ pub fn init( .url = url, .header_entries = header_entries, .header_buf = header_buf, - .aborted = signal, .hostname = hostname, - .signal_header_progress = signal_header_progress, - .enable_body_stream = enable_body_stream, + .signals = signals, }; } @@ -1384,8 +1408,7 @@ pub const AsyncHTTP = struct { elapsed: u64 = 0, gzip_elapsed: u64 = 0, - signal_header_progress: std.atomic.Atomic(bool), - enable_body_stream: std.atomic.Atomic(bool), + signals: Signals = .{}, pub var active_requests_count = std.atomic.Atomic(usize).init(0); pub var max_simultaneous_requests = std.atomic.Atomic(usize).init(256); @@ -1418,12 +1441,14 @@ pub const AsyncHTTP = struct { pub fn signalHeaderProgress(this: *AsyncHTTP) void { @fence(.Release); - this.client.signal_header_progress.store(true, .Release); + var progress = this.signals.header_progress orelse return; + progress.store(true, .Release); } pub fn enableBodyStreaming(this: *AsyncHTTP) void { @fence(.Release); - this.client.enable_body_stream.store(true, .Release); + var stream = this.signals.body_streaming orelse return; + stream.store(true, .Release); } pub fn clearData(this: *AsyncHTTP) void { @@ -1453,9 +1478,9 @@ pub const AsyncHTTP = struct { timeout: usize, callback: HTTPClientResult.Callback, http_proxy: ?URL, - signal: ?*std.atomic.Atomic(bool), hostname: ?[]u8, redirect_type: FetchRedirect, + signals: ?Signals, ) AsyncHTTP { var this = AsyncHTTP{ .allocator = allocator, @@ -1467,12 +1492,11 @@ pub const AsyncHTTP = struct { .response_buffer = response_buffer, .result_callback = callback, .http_proxy = http_proxy, - .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0, - .signal_header_progress = std.atomic.Atomic(bool).init(false), - .enable_body_stream = std.atomic.Atomic(bool).init(false), + .signals = signals orelse .{}, + .async_http_id = if (signals != null and signals.?.aborted != null) async_http_id.fetchAdd(1, .Monotonic) else 0, }; - this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname, &this.signal_header_progress, &this.enable_body_stream); + this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, hostname, signals orelse this.signals); this.client.async_http_id = this.async_http_id; this.client.timeout = timeout; this.client.http_proxy = this.http_proxy; @@ -1544,7 +1568,21 @@ pub const AsyncHTTP = struct { } pub fn initSync(allocator: std.mem.Allocator, method: Method, url: URL, headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, request_body: []const u8, timeout: usize, http_proxy: ?URL, hostname: ?[]u8, redirect_type: FetchRedirect) AsyncHTTP { - return @This().init(allocator, method, url, headers, headers_buf, response_buffer, request_body, timeout, undefined, http_proxy, null, hostname, redirect_type); + return @This().init( + allocator, + method, + url, + headers, + headers_buf, + response_buffer, + request_body, + timeout, + undefined, + http_proxy, + hostname, + redirect_type, + null, + ); } fn reset(this: *AsyncHTTP) !void { @@ -1646,8 +1684,10 @@ pub const AsyncHTTP = struct { if (!result.isSuccess()) { return result.fail; } - - return result.response; + std.debug.assert(result.metadata != null); + if (result.metadata) |metadata| { + return metadata.response; + } } unreachable; @@ -1659,33 +1699,34 @@ pub const AsyncHTTP = struct { var callback = this.result_callback; this.elapsed = http_thread.timer.read() -| this.elapsed; this.redirected = this.client.remaining_redirect_count != default_redirect_count; - if (!result.isSuccess()) { + if (result.isSuccess()) { + this.err = null; + if (result.metadata) |metadata| { + this.response = metadata.response; + } + this.state.store(.success, .Monotonic); + } else { this.err = result.fail; this.response = null; this.state.store(State.fail, .Monotonic); - } else { - this.err = null; - this.response = result.response; - this.state.store(.success, .Monotonic); } if (result.has_more) { callback.function(callback.ctx, result); } else { - this.client.deinit(); - - this.real.?.* = this.*; - this.real.?.response_buffer = this.response_buffer; - - log("onAsyncHTTPCallback: {any}", .{bun.fmt.fmtDuration(this.elapsed)}); + { + this.client.deinit(); + defer default_allocator.destroy(this); + this.real.?.* = this.*; + this.real.?.response_buffer = this.response_buffer; - default_allocator.destroy(this); + log("onAsyncHTTPCallback: {any}", .{bun.fmt.fmtDuration(this.elapsed)}); + callback.function(callback.ctx, result); + } const active_requests = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic); std.debug.assert(active_requests > 0); - callback.function(callback.ctx, result); - if (active_requests >= AsyncHTTP.max_simultaneous_requests.load(.Monotonic)) { http_thread.drainEvents(); } @@ -1715,10 +1756,6 @@ pub const AsyncHTTP = struct { } }; -pub fn hasSignalAborted(this: *const HTTPClient) bool { - return (this.aborted orelse return false).load(.Monotonic); -} - pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request { var header_count: usize = 0; var header_entries = this.header_entries.slice(); @@ -1842,7 +1879,7 @@ pub fn doRedirect(this: *HTTPClient) void { tunnel.deinit(); this.proxy_tunnel = null; } - if (this.aborted != null) { + if (this.signals.aborted != null) { _ = socket_async_http_abort_tracker.swapRemove(this.async_http_id); } return this.start(.{ .bytes = "" }, body_out_str); @@ -1874,7 +1911,7 @@ pub fn start(this: *HTTPClient, body: HTTPRequestBody, body_out_str: *MutableStr fn start_(this: *HTTPClient, comptime is_ssl: bool) void { // Aborted before connecting - if (this.hasSignalAborted()) { + if (this.signals.get(.aborted)) { this.fail(error.Aborted); return; } @@ -1912,7 +1949,7 @@ fn printResponse(response: picohttp.Response) void { } pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { - if (this.hasSignalAborted()) { + if (this.signals.get(.aborted)) { this.closeAndAbort(is_ssl, socket); return; } @@ -2256,7 +2293,7 @@ fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTP pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void { log("onData {}", .{incoming_data.len}); - if (this.hasSignalAborted()) { + if (this.signals.get(.aborted)) { this.closeAndAbort(is_ssl, socket); return; } @@ -2359,7 +2396,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u if (body_buf.len == 0) { // no body data yet, but we can report the headers - if (this.signal_header_progress.load(.Acquire)) { + if (this.signals.get(.header_progress)) { this.progressUpdate(is_ssl, ctx, socket); } return; @@ -2393,7 +2430,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u } // if not reported we report partially now - if (this.signal_header_progress.load(.Acquire)) { + if (this.signals.get(.header_progress)) { this.progressUpdate(is_ssl, ctx, socket); return; } @@ -2512,7 +2549,7 @@ pub fn closeAndAbort(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPCo } fn fail(this: *HTTPClient, err: anyerror) void { - if (this.aborted != null) { + if (this.signals.aborted != null) { _ = socket_async_http_abort_tracker.swapRemove(this.async_http_id); } this.state.request_stage = .fail; @@ -2561,7 +2598,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon if (this.state.stage != .done and this.state.stage != .fail) { const is_done = this.state.isDone(); - if (this.aborted != null and is_done) { + if (this.signals.aborted != null and is_done) { _ = socket_async_http_abort_tracker.swapRemove(this.async_http_id); } @@ -2585,22 +2622,17 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon } else if (!socket.isClosed()) { socket.close(0, null); } + this.state.reset(); this.state.response_stage = .done; this.state.request_stage = .done; this.state.stage = .done; this.proxy_tunneling = false; - if (comptime print_every > 0) { - print_every_i += 1; - if (print_every_i % print_every == 0) { - Output.prettyln("Heap stats for HTTP thread\n", .{}); - Output.flush(); - default_arena.dumpThreadStats(); - print_every_i = 0; - } - } } + result.body.?.* = body; + callback.run(result); + if (comptime print_every > 0) { print_every_i += 1; if (print_every_i % print_every == 0) { @@ -2610,25 +2642,39 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon print_every_i = 0; } } - callback.run(result); } } pub const HTTPClientResult = struct { body: ?*MutableString = null, - response: picohttp.Response = .{}, - metadata_buf: []u8 = &.{}, - href: []const u8 = "", - fail: anyerror = error.NoError, - redirected: bool = false, - headers_buf: []picohttp.Header = &.{}, has_more: bool = false, + fail: anyerror = error.NoError, + + metadata: ?ResultMetadata = null, /// For Http Client requests /// when Content-Length is provided this represents the whole size of the request /// If chunked encoded this will represent the total received size (ignoring the chunk headers) /// If is not chunked encoded and Content-Length is not provided this will be unknown body_size: BodySize = .unknown, + redirected: bool = false, + + pub const ResultMetadata = struct { + response: picohttp.Response = .{}, + metadata_buf: []u8 = &.{}, + href: []const u8 = "", + headers_buf: []picohttp.Header = &.{}, + + pub fn deinit(this: *ResultMetadata) void { + if (this.metadata_buf.len > 0) bun.default_allocator.free(this.metadata_buf); + if (this.headers_buf.len > 0) bun.default_allocator.free(this.headers_buf); + this.headers_buf = &.{}; + this.metadata_buf = &.{}; + this.href = ""; + this.response.headers = &.{}; + this.response.status = ""; + } + }; pub const BodySize = union(enum) { total_received: usize, @@ -2648,17 +2694,6 @@ pub const HTTPClientResult = struct { return this.fail == error.Aborted; } - pub fn deinitMetadata(this: *HTTPClientResult) void { - if (this.metadata_buf.len > 0) bun.default_allocator.free(this.metadata_buf); - if (this.headers_buf.len > 0) bun.default_allocator.free(this.headers_buf); - - this.headers_buf = &.{}; - this.metadata_buf = &.{}; - this.href = ""; - this.response.headers = &.{}; - this.response.status = ""; - } - pub const Callback = struct { ctx: *anyopaque, function: Function, @@ -2694,14 +2729,26 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientRes .{ .content_length = content_length } else .{ .unknown = {} }; + if (!this.state.metadata_sent) { + this.state.metadata_sent = true; + return HTTPClientResult{ + .metadata = .{ + .response = metadata.response, + .metadata_buf = metadata.owned_buf, + .href = metadata.url, + .headers_buf = metadata.response.headers, + }, + .body = this.state.body_out_str, + .redirected = this.remaining_redirect_count != default_redirect_count, + .fail = this.state.fail, + .has_more = this.state.fail == error.NoError and !this.state.isDone(), + .body_size = body_size, + }; + } return HTTPClientResult{ .body = this.state.body_out_str, - .response = metadata.response, - .metadata_buf = metadata.owned_buf, - .redirected = this.remaining_redirect_count != default_redirect_count, - .href = metadata.url, + .metadata = null, .fail = this.state.fail, - .headers_buf = metadata.response.headers, .has_more = this.state.fail == error.NoError and !this.state.isDone(), .body_size = body_size, }; @@ -2786,7 +2833,7 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con // done or streaming const is_done = this.state.total_body_received >= content_length; - if (is_done or this.enable_body_stream.load(.Acquire)) { + if (is_done or this.signals.get(.body_streaming)) { const processed = try this.state.processBodyBuffer(buffer.*); if (this.progress_node) |progress| { @@ -2848,7 +2895,7 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets( progress.context.maybeRefresh(); } // streaming chunks - if (this.enable_body_stream.load(.Acquire)) { + if (this.signals.get(.body_streaming)) { const processed = try this.state.processBodyBuffer(buffer); return processed > 0; } @@ -2927,7 +2974,7 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket( try body_buffer.appendSliceExact(buffer); // streaming chunks - if (this.enable_body_stream.load(.Acquire)) { + if (this.signals.get(.body_streaming)) { const processed = try this.state.processBodyBuffer(body_buffer.*); return processed > 0; } diff --git a/src/install/install.zig b/src/install/install.zig index d444b62fc..b0cdf35c8 100644 --- a/src/install/install.zig +++ b/src/install/install.zig @@ -370,8 +370,8 @@ const NetworkTask = struct { this.getCompletionCallback(), this.package_manager.httpProxy(url), null, - null, HTTP.FetchRedirect.follow, + null, ); this.callback = .{ .package_manifest = .{ @@ -448,8 +448,8 @@ const NetworkTask = struct { this.getCompletionCallback(), this.package_manager.httpProxy(url), null, - null, HTTP.FetchRedirect.follow, + null, ); this.callback = .{ .extract = tarball }; } diff --git a/src/js/builtins/ReadableStreamDefaultReader.ts b/src/js/builtins/ReadableStreamDefaultReader.ts index ea1a64b68..169806c52 100644 --- a/src/js/builtins/ReadableStreamDefaultReader.ts +++ b/src/js/builtins/ReadableStreamDefaultReader.ts @@ -104,9 +104,11 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau if ($getByIdDirectPrivate(controller, "closeRequested")) $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); - else if ($isReadableStreamDefaultController(controller)) + else if ($isReadableStreamDefaultController(controller)) { $readableStreamDefaultControllerCallPullIfNeeded(controller); - else if ($isReadableByteStreamController(controller)) $readableByteStreamControllerCallPullIfNeeded(controller); + } else if ($isReadableByteStreamController(controller)) { + $readableByteStreamControllerCallPullIfNeeded(controller); + } return { value: outValues, size, done: false }; } @@ -138,11 +140,13 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau var size = queue.size; $resetQueue(queue); - if ($getByIdDirectPrivate(controller, "closeRequested")) + if ($getByIdDirectPrivate(controller, "closeRequested")) { $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); - else if ($isReadableStreamDefaultController(controller)) + } else if ($isReadableStreamDefaultController(controller)) { $readableStreamDefaultControllerCallPullIfNeeded(controller); - else if ($isReadableByteStreamController(controller)) $readableByteStreamControllerCallPullIfNeeded(controller); + } else if ($isReadableByteStreamController(controller)) { + $readableByteStreamControllerCallPullIfNeeded(controller); + } return { value: value, size: size, done: false }; }; diff --git a/src/js/node/async_hooks.ts b/src/js/node/async_hooks.ts index 2a671b6a2..d04b226f8 100644 --- a/src/js/node/async_hooks.ts +++ b/src/js/node/async_hooks.ts @@ -21,7 +21,7 @@ // AsyncContextData is an immutable array managed in here, formatted [key, value, key, value] where // each key is an AsyncLocalStorage object and the value is the associated value. // -const { cleanupLater } = $lazy("async_hooks"); +const { cleanupLater, setAsyncHooksEnabled } = $lazy("async_hooks"); function get(): ReadonlyArray<any> | undefined { return $getInternalField($asyncContext, 0); @@ -34,7 +34,9 @@ function set(contextValue: ReadonlyArray<any> | undefined) { class AsyncLocalStorage { #disableCalled = false; - constructor() {} + constructor() { + setAsyncHooksEnabled(true); + } static bind(fn, ...args: any) { return this.snapshot().bind(null, fn, ...args); @@ -160,6 +162,7 @@ class AsyncResource { if (typeof type !== "string") { throw new TypeError('The "type" argument must be of type string. Received type ' + typeof type); } + setAsyncHooksEnabled(true); this.type = type; this.#snapshot = get(); } diff --git a/src/js/out/InternalModuleRegistryConstants.h b/src/js/out/InternalModuleRegistryConstants.h index 20a7693b1..dea913eb4 100644 --- a/src/js/out/InternalModuleRegistryConstants.h +++ b/src/js/out/InternalModuleRegistryConstants.h @@ -30,7 +30,7 @@ static constexpr ASCIILiteral NodeAssertStrictCode = "(function (){\"use strict\ // // -static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s; +static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater, setAsyncHooksEnabled } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n setAsyncHooksEnabled(!0);\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n setAsyncHooksEnabled(!0), this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s; // // @@ -263,7 +263,7 @@ static constexpr ASCIILiteral NodeAssertStrictCode = "(function (){\"use strict\ // // -static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s; +static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater, setAsyncHooksEnabled } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n setAsyncHooksEnabled(!0);\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n setAsyncHooksEnabled(!0), this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s; // // @@ -497,7 +497,7 @@ static constexpr ASCIILiteral NodeAssertStrictCode = "(function (){\"use strict\ // // -static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s; +static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater, setAsyncHooksEnabled } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n setAsyncHooksEnabled(!0);\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n setAsyncHooksEnabled(!0), this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s; // // |