aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
m---------src/bun.js/WebKit0
-rw-r--r--src/bun.js/api/server.zig295
-rw-r--r--src/bun.js/bindings/AsyncContextFrame.cpp8
-rw-r--r--src/bun.js/bindings/ZigGlobalObject.cpp14
-rw-r--r--src/bun.js/event_loop.zig49
-rw-r--r--src/bun.js/javascript.zig4
-rw-r--r--src/bun.js/webcore/body.zig19
-rw-r--r--src/bun.js/webcore/request.zig20
-rw-r--r--src/bun.js/webcore/response.zig108
-rw-r--r--src/bun.js/webcore/streams.zig125
-rw-r--r--src/cli/test_command.zig4
-rw-r--r--src/http_client_async.zig229
-rw-r--r--src/install/install.zig4
-rw-r--r--src/js/builtins/ReadableStreamDefaultReader.ts14
-rw-r--r--src/js/node/async_hooks.ts7
-rw-r--r--src/js/out/InternalModuleRegistryConstants.h6
16 files changed, 637 insertions, 269 deletions
diff --git a/src/bun.js/WebKit b/src/bun.js/WebKit
-Subproject fd79ce3120a692f4aed314c3da3dd452b4aa865
+Subproject 48c1316e907ca597e27e5a7624160dc18a4df8e
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index 45c82b9fa..d6a9e1c6e 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -1016,7 +1016,7 @@ fn NewFlags(comptime debug_mode: bool) type {
is_transfer_encoding: bool = false,
/// Used to identify if request can be safely deinitialized
- is_waiting_body: bool = false,
+ is_waiting_for_request_body: bool = false,
/// Used in renderMissing in debug mode to show the user an HTML page
/// Used to avoid looking at the uws.Request struct after it's been freed
is_web_browser_navigation: if (debug_mode) bool else void = if (debug_mode) false else {},
@@ -1080,9 +1080,21 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
/// When the response body is a temporary value
response_buf_owned: std.ArrayListUnmanaged(u8) = .{},
+ /// Defer finalization until after the request handler task is completed?
+ defer_deinit_until_callback_completes: ?*bool = null,
+
// TODO: support builtin compression
const can_sendfile = !ssl_enabled;
+ pub inline fn isAsync(this: *const RequestContext) bool {
+ return this.defer_deinit_until_callback_completes == null;
+ }
+
+ fn drainMicrotasks(this: *const RequestContext) void {
+ if (this.isAsync()) return;
+ this.server.vm.drainMicrotasks();
+ }
+
pub fn setAbortHandler(this: *RequestContext) void {
if (this.flags.has_abort_handler) return;
if (this.resp) |resp| {
@@ -1320,8 +1332,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn end(this: *RequestContext, data: []const u8, closeConnection: bool) void {
if (this.resp) |resp| {
- if (this.flags.is_waiting_body) {
- this.flags.is_waiting_body = false;
+ if (this.flags.is_waiting_for_request_body) {
+ this.flags.is_waiting_for_request_body = false;
resp.clearOnData();
}
resp.end(data, closeConnection);
@@ -1331,8 +1343,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn endStream(this: *RequestContext, closeConnection: bool) void {
if (this.resp) |resp| {
- if (this.flags.is_waiting_body) {
- this.flags.is_waiting_body = false;
+ if (this.flags.is_waiting_for_request_body) {
+ this.flags.is_waiting_for_request_body = false;
resp.clearOnData();
}
resp.endStream(closeConnection);
@@ -1342,8 +1354,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn endWithoutBody(this: *RequestContext, closeConnection: bool) void {
if (this.resp) |resp| {
- if (this.flags.is_waiting_body) {
- this.flags.is_waiting_body = false;
+ if (this.flags.is_waiting_for_request_body) {
+ this.flags.is_waiting_for_request_body = false;
resp.clearOnData();
}
resp.endWithoutBody(closeConnection);
@@ -1562,9 +1574,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// if we are waiting for the body yet and the request was not aborted we can safely clear the onData callback
if (this.resp) |resp| {
- if (this.flags.is_waiting_body and this.flags.aborted == false) {
+ if (this.flags.is_waiting_for_request_body and this.flags.aborted == false) {
resp.clearOnData();
- this.flags.is_waiting_body = false;
+ this.flags.is_waiting_for_request_body = false;
}
}
}
@@ -1576,6 +1588,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
pub fn deinit(this: *RequestContext) void {
+ if (this.defer_deinit_until_callback_completes) |defer_deinit| {
+ defer_deinit.* = true;
+ ctxLog("deferred deinit <d> ({*})<r>", .{this});
+ return;
+ }
+
ctxLog("deinit<d> ({*})<r>", .{this});
if (comptime Environment.allow_assert)
std.debug.assert(this.flags.finalized);
@@ -1953,6 +1971,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
const StreamPair = struct { this: *RequestContext, stream: JSC.WebCore.ReadableStream };
+ fn handleFirstStreamWrite(this: *@This()) void {
+ if (!this.flags.has_written_status) {
+ this.renderMetadata();
+ }
+ }
+
fn doRenderStream(pair: *StreamPair) void {
var this = pair.this;
var stream = pair.stream;
@@ -1963,20 +1987,18 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
const resp = this.resp.?;
- // uWS automatically adds the status line if needed
- // we want to batch network calls as much as possible
- if (!(this.response_ptr.?.statusCode() == 200 and this.response_ptr.?.body.init.headers == null)) {
- this.renderMetadata();
- }
-
stream.value.ensureStillAlive();
var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable;
+ var globalThis = this.server.globalThis;
response_stream.* = ResponseStream.JSSink{
.sink = .{
.res = resp,
.allocator = this.allocator,
.buffer = bun.ByteList{},
+ .onFirstWrite = @ptrCast(&handleFirstStreamWrite),
+ .ctx = this,
+ .globalThis = globalThis,
},
};
var signal = &response_stream.sink.signal;
@@ -1991,13 +2013,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// We are already corked!
const assignment_result: JSValue = ResponseStream.JSSink.assignToStream(
- this.server.globalThis,
+ globalThis,
stream.value,
response_stream,
@as(**anyopaque, @ptrCast(&signal.ptr)),
);
assignment_result.ensureStillAlive();
+
// assert that it was updated
std.debug.assert(!signal.isDead());
@@ -2015,32 +2038,18 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
response_stream.detach();
this.sink = null;
response_stream.sink.destroy();
- stream.value.unprotect();
return this.handleReject(err_value);
}
- if (response_stream.sink.done or
- // TODO: is there a condition where resp could be freed before done?
- resp.hasResponded())
- {
+ if (resp.hasResponded()) {
if (!this.flags.aborted) resp.clearAborted();
- const wrote_anything = response_stream.sink.wrote > 0;
- streamLog("is done", .{});
- const responded = resp.hasResponded();
-
+ streamLog("done", .{});
response_stream.detach();
this.sink = null;
response_stream.sink.destroy();
- if (!responded and !wrote_anything and !this.flags.aborted) {
- this.renderMissing();
- return;
- } else if (wrote_anything and !responded and !this.flags.aborted) {
- this.endStream(this.shouldCloseConnection());
- }
-
+ this.endStream(this.shouldCloseConnection());
this.finalize();
stream.value.unprotect();
-
return;
}
@@ -2049,19 +2058,28 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// it returns a Promise when it goes through ReadableStreamDefaultReader
if (assignment_result.asAnyPromise()) |promise| {
streamLog("returned a promise", .{});
- switch (promise.status(this.server.globalThis.vm())) {
+ this.drainMicrotasks();
+
+ switch (promise.status(globalThis.vm())) {
.Pending => {
streamLog("promise still Pending", .{});
+ if (!this.flags.has_written_status) {
+ response_stream.sink.onFirstWrite = null;
+ response_stream.sink.ctx = null;
+ this.renderMetadata();
+ }
+
// TODO: should this timeout?
this.setAbortHandler();
+ this.pending_promises_for_abort += 1;
this.response_ptr.?.body.value = .{
.Locked = .{
.readable = stream,
- .global = this.server.globalThis,
+ .global = globalThis,
},
};
assignment_result.then(
- this.server.globalThis,
+ globalThis,
this,
onResolveStream,
onRejectStream,
@@ -2071,11 +2089,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
},
.Fulfilled => {
streamLog("promise Fulfilled", .{});
+ defer stream.value.unprotect();
+
this.handleResolveStream();
},
.Rejected => {
streamLog("promise Rejected", .{});
- this.handleRejectStream(this.server.globalThis, promise.result(this.server.globalThis.vm()));
+ defer stream.value.unprotect();
+
+ this.handleRejectStream(globalThis, promise.result(globalThis.vm()));
},
}
return;
@@ -2084,22 +2106,23 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (this.flags.aborted) {
response_stream.detach();
- stream.cancel(this.server.globalThis);
- response_stream.sink.done = true;
+ stream.cancel(globalThis);
+ defer stream.value.unprotect();
+ response_stream.sink.markDone();
this.finalizeForAbort();
response_stream.sink.finalize();
- stream.value.unprotect();
return;
}
stream.value.ensureStillAlive();
+ defer stream.value.unprotect();
const is_in_progress = response_stream.sink.has_backpressure or !(response_stream.sink.wrote == 0 and
response_stream.sink.buffer.len == 0);
- if (!stream.isLocked(this.server.globalThis) and !is_in_progress) {
- if (JSC.WebCore.ReadableStream.fromJS(stream.value, this.server.globalThis)) |comparator| {
+ if (!stream.isLocked(globalThis) and !is_in_progress) {
+ if (JSC.WebCore.ReadableStream.fromJS(stream.value, globalThis)) |comparator| {
if (std.meta.activeTag(comparator.ptr) == std.meta.activeTag(stream.ptr)) {
streamLog("is not locked", .{});
this.renderMissing();
@@ -2111,7 +2134,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.setAbortHandler();
streamLog("is in progress, but did not return a Promise. Finalizing request context", .{});
this.finalize();
- stream.value.unprotect();
}
const streamLog = Output.scoped(.ReadableStream, false);
@@ -2120,6 +2142,46 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
return @intFromPtr(this.upgrade_context) == std.math.maxInt(usize);
}
+ fn toAsyncWithoutAbortHandler(ctx: *RequestContext, req: *uws.Request, request_object: *Request) void {
+ request_object.uws_request = req;
+
+ request_object.ensureURL() catch {
+ request_object.url = bun.String.empty;
+ };
+
+ // we have to clone the request headers here since they will soon belong to a different request
+ if (request_object.headers == null) {
+ request_object.headers = JSC.FetchHeaders.createFromUWS(ctx.server.globalThis, req);
+ }
+
+ // This object dies after the stack frame is popped
+ // so we have to clear it in here too
+ request_object.uws_request = null;
+ }
+
+ fn toAsync(
+ ctx: *RequestContext,
+ req: *uws.Request,
+ request_object: *Request,
+ ) void {
+ ctxLog("toAsync", .{});
+ ctx.toAsyncWithoutAbortHandler(req, request_object);
+ if (comptime debug_mode) {
+ ctx.pathname = request_object.url.clone();
+ }
+ ctx.setAbortHandler();
+ }
+
+ // Each HTTP request or TCP socket connection is effectively a "task".
+ //
+ // However, unlike the regular task queue, we don't drain the microtask
+ // queue at the end.
+ //
+ // Instead, we drain it multiple times, at the points that would
+ // otherwise "halt" the Response from being rendered.
+ //
+ // - If you return a Promise, we drain the microtask queue once
+ // - If you return a streaming Response, we drain the microtask queue (possibly the 2nd time this task!)
pub fn onResponse(
ctx: *RequestContext,
this: *ThisServer,
@@ -2128,8 +2190,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
request_value: JSValue,
response_value: JSValue,
) void {
+ _ = request_object;
+ _ = req;
request_value.ensureStillAlive();
response_value.ensureStillAlive();
+ ctx.drainMicrotasks();
if (ctx.flags.aborted) {
ctx.finalizeForAbort();
@@ -2159,6 +2224,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
ctx.response_jsvalue = response_value;
ctx.response_jsvalue.ensureStillAlive();
ctx.flags.response_protected = false;
+
response.body.value.toBlobIfPossible();
switch (response.body.value) {
@@ -2210,6 +2276,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
ctx.response_jsvalue.ensureStillAlive();
ctx.flags.response_protected = false;
ctx.response_ptr = response;
+
response.body.value.toBlobIfPossible();
switch (response.body.value) {
.Blob => |*blob| {
@@ -2236,35 +2303,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
if (wait_for_promise) {
- request_object.uws_request = req;
-
- request_object.ensureURL() catch {
- request_object.url = bun.String.empty;
- };
-
- // we have to clone the request headers here since they will soon belong to a different request
- if (request_object.headers == null) {
- request_object.headers = JSC.FetchHeaders.createFromUWS(this.globalThis, req);
- }
-
- if (comptime debug_mode) {
- ctx.pathname = request_object.url.clone();
- }
-
- // This object dies after the stack frame is popped
- // so we have to clear it in here too
- request_object.uws_request = null;
-
- ctx.setAbortHandler();
ctx.pending_promises_for_abort += 1;
-
response_value.then(this.globalThis, ctx, RequestContext.onResolve, RequestContext.onReject);
return;
}
- if (ctx.resp) |resp| {
- // The user returned something that wasn't a promise or a promise with a response
- if (!resp.hasResponded() and !ctx.flags.has_marked_pending) ctx.renderMissing();
- }
}
pub fn handleResolveStream(req: *RequestContext) void {
@@ -2276,6 +2318,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
wrapper.sink.done = true;
req.flags.aborted = req.flags.aborted or wrapper.sink.aborted;
wrote_anything = wrapper.sink.wrote > 0;
+
wrapper.sink.finalize();
wrapper.detach();
req.sink = null;
@@ -2300,12 +2343,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
const responded = resp.hasResponded();
- if (!responded and !wrote_anything) {
- resp.clearAborted();
- req.renderMissing();
- return;
- } else if (!responded and wrote_anything) {
+ if (!responded) {
resp.clearAborted();
+ if (!req.flags.has_written_status) {
+ req.renderMetadata();
+ }
req.endStream(req.shouldCloseConnection());
}
@@ -2316,6 +2358,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
streamLog("onResolveStream", .{});
var args = callframe.arguments(2);
var req: *@This() = args.ptr[args.len - 1].asPromisePtr(@This());
+ req.pending_promises_for_abort -|= 1;
req.handleResolveStream();
return JSValue.jsUndefined();
}
@@ -2323,19 +2366,19 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
streamLog("onRejectStream", .{});
const args = callframe.arguments(2);
var req = args.ptr[args.len - 1].asPromisePtr(@This());
+ req.pending_promises_for_abort -|= 1;
var err = args.ptr[0];
req.handleRejectStream(globalThis, err);
return JSValue.jsUndefined();
}
pub fn handleRejectStream(req: *@This(), globalThis: *JSC.JSGlobalObject, err: JSValue) void {
+ _ = globalThis;
streamLog("handleRejectStream", .{});
- var wrote_anything = req.flags.has_written_status;
if (req.sink) |wrapper| {
wrapper.sink.pending_flush = null;
wrapper.sink.done = true;
- wrote_anything = wrote_anything or wrapper.sink.wrote > 0;
req.flags.aborted = req.flags.aborted or wrapper.sink.aborted;
wrapper.sink.finalize();
wrapper.detach();
@@ -2350,40 +2393,32 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
- streamLog("onReject({any})", .{wrote_anything});
-
- //aborted so call finalizeForAbort
+ // aborted so call finalizeForAbort
if (req.flags.aborted) {
req.finalizeForAbort();
return;
}
- if (!err.isEmptyOrUndefinedOrNull() and !wrote_anything) {
- req.response_jsvalue.unprotect();
- req.response_jsvalue = JSValue.zero;
- req.handleReject(err);
- return;
- } else if (wrote_anything) {
- req.endStream(true);
- if (comptime debug_mode) {
- if (!err.isEmptyOrUndefinedOrNull()) {
- var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator);
- defer exception_list.deinit();
- req.server.vm.runErrorHandler(err, &exception_list);
- }
- }
- req.finalize();
- return;
+ streamLog("onReject()", .{});
+
+ if (!req.flags.has_written_status) {
+ req.renderMetadata();
}
- const fallback = JSC.SystemError{
- .code = bun.String.static(@as(string, @tagName(JSC.Node.ErrorCode.ERR_UNHANDLED_ERROR))),
- .message = bun.String.static("Unhandled error in ReadableStream"),
- };
- req.handleReject(fallback.toErrorInstance(globalThis));
+ req.endStream(true);
+ if (comptime debug_mode) {
+ if (!err.isEmptyOrUndefinedOrNull()) {
+ var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator);
+ defer exception_list.deinit();
+ req.server.vm.runErrorHandler(err, &exception_list);
+ }
+ }
+ req.finalize();
}
pub fn doRenderWithBody(this: *RequestContext, value: *JSC.WebCore.Body.Value) void {
+ this.drainMicrotasks();
+
// If a ReadableStream can trivially be converted to a Blob, do so.
// If it's a WTFStringImpl and it cannot be used as a UTF-8 string, convert it to a Blob.
value.toBlobIfPossible();
@@ -2833,8 +2868,13 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
std.debug.assert(this.resp == resp);
- this.flags.is_waiting_body = last == false;
+ this.flags.is_waiting_for_request_body = last == false;
if (this.flags.aborted or this.flags.has_marked_complete) return;
+ if (!last and chunk.len == 0) {
+ // Sometimes, we get back an empty chunk
+ // We have to ignore those chunks unless it's the last one
+ return;
+ }
if (this.request_body != null) {
var body = this.request_body.?;
@@ -2900,6 +2940,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
if (old == .Locked) {
+ defer this.drainMicrotasks();
+
old.resolve(&body.value, this.server.globalThis);
}
return;
@@ -3254,7 +3296,7 @@ pub const WebSocketServer = struct {
globalObject.throwInvalidArguments("websocket expects maxPayloadLength to be an integer", .{});
return null;
}
- server.maxPayloadLength = @as(u32, @intCast(@max(value.toInt64(), 0)));
+ server.maxPayloadLength = @truncate(@max(value.toInt64(), 0));
}
}
@@ -3265,7 +3307,7 @@ pub const WebSocketServer = struct {
return null;
}
- var idleTimeout = @as(u16, @intCast(@as(u32, @truncate(@max(value.toInt64(), 0)))));
+ var idleTimeout: u16 = @truncate(@max(value.toInt64(), 0));
if (idleTimeout > 960) {
globalObject.throwInvalidArguments("websocket expects idleTimeout to be 960 or less", .{});
return null;
@@ -3285,7 +3327,7 @@ pub const WebSocketServer = struct {
return null;
}
- server.backpressureLimit = @as(u32, @intCast(@max(value.toInt64(), 0)));
+ server.backpressureLimit = @truncate(@max(value.toInt64(), 0));
}
}
@@ -5271,6 +5313,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
) void {
JSC.markBinding(@src());
this.pending_requests += 1;
+
req.setYield(false);
var ctx = this.request_pool_allocator.tryGet() catch @panic("ran out of memory");
ctx.create(this, req, resp);
@@ -5337,7 +5380,8 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
.onStartStreaming = RequestContext.onStartStreamingRequestBodyCallback,
},
};
- ctx.flags.is_waiting_body = true;
+ ctx.flags.is_waiting_for_request_body = true;
+
resp.onData(*RequestContext, RequestContext.onBufferedBodyChunk, ctx);
}
}
@@ -5352,7 +5396,13 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
request_value.ensureStillAlive();
const response_value = this.config.onRequest.callWithThis(this.globalThis, this.thisObject, &args);
+ defer {
+ // uWS request will not live longer than this function
+ request_object.uws_request = null;
+ }
+ var should_deinit_context = false;
+ ctx.defer_deinit_until_callback_completes = &should_deinit_context;
ctx.onResponse(
this,
req,
@@ -5360,8 +5410,20 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
request_value,
response_value,
);
- // uWS request will not live longer than this function
- request_object.uws_request = null;
+ ctx.defer_deinit_until_callback_completes = null;
+
+ if (should_deinit_context) {
+ request_object.uws_request = null;
+ ctx.deinit();
+ return;
+ }
+
+ if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body) {
+ ctx.renderMissing();
+ return;
+ }
+
+ ctx.toAsync(req, request_object);
}
pub fn onWebSocketUpgrade(
@@ -5404,7 +5466,13 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
const request_value = args[0];
request_value.ensureStillAlive();
const response_value = this.config.onRequest.callWithThis(this.globalThis, this.thisObject, &args);
+ defer {
+ // uWS request will not live longer than this function
+ request_object.uws_request = null;
+ }
+ var should_deinit_context = false;
+ ctx.defer_deinit_until_callback_completes = &should_deinit_context;
ctx.onResponse(
this,
req,
@@ -5412,9 +5480,20 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
request_value,
response_value,
);
+ ctx.defer_deinit_until_callback_completes = null;
- // uWS request will not live longer than this function
- request_object.uws_request = null;
+ if (should_deinit_context) {
+ request_object.uws_request = null;
+ ctx.deinit();
+ return;
+ }
+
+ if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body) {
+ ctx.renderMissing();
+ return;
+ }
+
+ ctx.toAsync(req, request_object);
}
pub fn listen(this: *ThisServer) void {
diff --git a/src/bun.js/bindings/AsyncContextFrame.cpp b/src/bun.js/bindings/AsyncContextFrame.cpp
index 2a103a8d1..1c541b2a8 100644
--- a/src/bun.js/bindings/AsyncContextFrame.cpp
+++ b/src/bun.js/bindings/AsyncContextFrame.cpp
@@ -97,10 +97,18 @@ extern "C" EncodedJSValue AsyncContextFrame__withAsyncContextIfNeeded(JSGlobalOb
// }
JSValue AsyncContextFrame::call(JSGlobalObject* global, JSValue functionObject, JSValue thisValue, const ArgList& args)
{
+ if (LIKELY(!global->isAsyncContextTrackingEnabled())) {
+ return JSC::profiledCall(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args);
+ }
+
ASYNCCONTEXTFRAME_CALL_IMPL(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args);
}
JSValue AsyncContextFrame::call(JSGlobalObject* global, JSValue functionObject, JSValue thisValue, const ArgList& args, NakedPtr<Exception>& returnedException)
{
+ if (LIKELY(!global->isAsyncContextTrackingEnabled())) {
+ return JSC::profiledCall(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args, returnedException);
+ }
+
ASYNCCONTEXTFRAME_CALL_IMPL(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args, returnedException);
}
diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp
index 0ecafeae4..d3bd623dd 100644
--- a/src/bun.js/bindings/ZigGlobalObject.cpp
+++ b/src/bun.js/bindings/ZigGlobalObject.cpp
@@ -1411,6 +1411,16 @@ JSC_DEFINE_HOST_FUNCTION(asyncHooksCleanupLater, (JSC::JSGlobalObject * globalOb
return JSC::JSValue::encode(JSC::jsUndefined());
}
+JSC_DEFINE_HOST_FUNCTION(asyncHooksSetEnabled, (JSC::JSGlobalObject * globalObject, JSC::CallFrame* callFrame))
+{
+ // assumptions and notes:
+ // - nobody else uses setOnEachMicrotaskTick
+ // - this is called by js if we set async context in a way we may not clear it
+ // - AsyncLocalStorage.prototype.run cleans up after itself and does not call this cb
+ globalObject->setAsyncContextTrackingEnabled(callFrame->argument(0).toBoolean(globalObject));
+ return JSC::JSValue::encode(JSC::jsUndefined());
+}
+
extern "C" int Bun__ttySetMode(int fd, int mode);
JSC_DEFINE_HOST_FUNCTION(jsTTYSetMode, (JSC::JSGlobalObject * globalObject, CallFrame* callFrame))
@@ -1689,6 +1699,10 @@ static JSC_DEFINE_HOST_FUNCTION(functionLazyLoad,
if (string == "async_hooks"_s) {
auto* obj = constructEmptyObject(globalObject);
obj->putDirect(
+ vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "setAsyncHooksEnabled"_s)),
+ JSC::JSFunction::create(vm, globalObject, 0, "setAsyncHooksEnabled"_s, asyncHooksSetEnabled, ImplementationVisibility::Public), 0);
+
+ obj->putDirect(
vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "cleanupLater"_s)),
JSC::JSFunction::create(vm, globalObject, 0, "cleanupLater"_s, asyncHooksCleanupLater, ImplementationVisibility::Public), 0);
return JSValue::encode(obj);
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 92874b6a4..896297060 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -509,6 +509,7 @@ comptime {
}
}
+pub const DeferredRepeatingTask = *const (fn (*anyopaque) bool);
pub const EventLoop = struct {
tasks: Queue = undefined,
concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{},
@@ -518,6 +519,7 @@ pub const EventLoop = struct {
start_server_on_next_tick: bool = false,
defer_count: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
forever_timer: ?*uws.Timer = null,
+ deferred_microtask_map: std.AutoArrayHashMapUnmanaged(?*anyopaque, DeferredRepeatingTask) = .{},
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
const log = bun.Output.scoped(.EventLoop, false);
@@ -528,6 +530,49 @@ pub const EventLoop = struct {
}
}
+ pub fn drainMicrotasksWithVM(this: *EventLoop, vm: *JSC.VM) void {
+ vm.drainMicrotasks();
+ this.drainDeferredTasks();
+ }
+
+ pub fn drainMicrotasks(this: *EventLoop) void {
+ this.drainMicrotasksWithVM(this.global.vm());
+ }
+
+ pub fn ensureAliveForOneTick(this: *EventLoop) void {
+ if (this.noop_task.scheduled) return;
+ this.enqueueTask(Task.init(&this.noop_task));
+ this.noop_task.scheduled = true;
+ }
+
+ pub fn registerDeferredTask(this: *EventLoop, ctx: ?*anyopaque, task: DeferredRepeatingTask) bool {
+ const existing = this.deferred_microtask_map.getOrPutValue(this.virtual_machine.allocator, ctx, task) catch unreachable;
+ return existing.found_existing;
+ }
+
+ pub fn unregisterDeferredTask(this: *EventLoop, ctx: ?*anyopaque) bool {
+ return this.deferred_microtask_map.swapRemove(ctx);
+ }
+
+ fn drainDeferredTasks(this: *EventLoop) void {
+ var i: usize = 0;
+ var last = this.deferred_microtask_map.count();
+ while (i < last) {
+ var key = this.deferred_microtask_map.keys()[i] orelse {
+ this.deferred_microtask_map.swapRemoveAt(i);
+ last = this.deferred_microtask_map.count();
+ continue;
+ };
+
+ if (!this.deferred_microtask_map.values()[i](key)) {
+ this.deferred_microtask_map.swapRemoveAt(i);
+ last = this.deferred_microtask_map.count();
+ } else {
+ i += 1;
+ }
+ }
+ }
+
pub fn tickWithCount(this: *EventLoop) u32 {
var global = this.global;
var global_vm = global.vm();
@@ -621,7 +666,7 @@ pub const EventLoop = struct {
}
global_vm.releaseWeakRefs();
- global_vm.drainMicrotasks();
+ this.drainMicrotasksWithVM(global_vm);
}
this.tasks.head = if (this.tasks.count == 0) 0 else this.tasks.head;
@@ -758,7 +803,7 @@ pub const EventLoop = struct {
this.tickConcurrent();
} else {
global_vm.releaseWeakRefs();
- global_vm.drainMicrotasks();
+ this.drainMicrotasksWithVM(global_vm);
this.tickConcurrent();
if (this.tasks.count > 0) continue;
}
diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig
index a016129e2..492b9fbee 100644
--- a/src/bun.js/javascript.zig
+++ b/src/bun.js/javascript.zig
@@ -1742,6 +1742,10 @@ pub const VirtualMachine = struct {
ret.success = true;
}
+ pub fn drainMicrotasks(this: *VirtualMachine) void {
+ this.eventLoop().drainMicrotasks();
+ }
+
pub fn processFetchLog(globalThis: *JSGlobalObject, specifier: bun.String, referrer: bun.String, log: *logger.Log, ret: *ErrorableResolvedSource, err: anyerror) void {
switch (log.msgs.items.len) {
0 => {
diff --git a/src/bun.js/webcore/body.zig b/src/bun.js/webcore/body.zig
index fa0ec9b24..86462dd04 100644
--- a/src/bun.js/webcore/body.zig
+++ b/src/bun.js/webcore/body.zig
@@ -249,10 +249,7 @@ pub const Body = struct {
pub fn setPromise(value: *PendingValue, globalThis: *JSC.JSGlobalObject, action: Action) JSValue {
value.action = action;
- if (value.readable) |readable| {
- // switch (readable.ptr) {
- // .JavaScript
- // }
+ if (value.readable) |readable| handle_stream: {
switch (action) {
.getFormData, .getText, .getJSON, .getBlob, .getArrayBuffer => {
value.promise = switch (action) {
@@ -261,6 +258,20 @@ pub const Body = struct {
.getText => globalThis.readableStreamToText(readable.value),
.getBlob => globalThis.readableStreamToBlob(readable.value),
.getFormData => |form_data| brk: {
+ if (value.onStartBuffering != null) {
+ if (readable.isDisturbed(globalThis)) {
+ form_data.?.deinit();
+ readable.value.unprotect();
+ value.readable = null;
+ value.action = .{ .none = {} };
+ return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.createErrorInstance("ReadableStream is already used", .{}));
+ } else {
+ readable.detach(globalThis);
+ value.readable = null;
+ }
+
+ break :handle_stream;
+ }
defer {
form_data.?.deinit();
value.action.getFormData = null;
diff --git a/src/bun.js/webcore/request.zig b/src/bun.js/webcore/request.zig
index c01e72d60..aaa3f6b79 100644
--- a/src/bun.js/webcore/request.zig
+++ b/src/bun.js/webcore/request.zig
@@ -485,7 +485,8 @@ pub const Request = struct {
_ = req.body.unref();
return null;
};
- req.url = str;
+ req.url = str.dupeRef();
+
if (!req.url.isEmpty())
fields.insert(.url);
} else if (!url_or_object_type.isObject()) {
@@ -554,7 +555,7 @@ pub const Request = struct {
if (!fields.contains(.url)) {
if (!response.url.isEmpty()) {
- req.url = response.url;
+ req.url = response.url.dupeRef();
fields.insert(.url);
}
}
@@ -586,7 +587,7 @@ pub const Request = struct {
if (!fields.contains(.url)) {
if (value.fastGet(globalThis, .url)) |url| {
- req.url = bun.String.fromJS(url, globalThis);
+ req.url = bun.String.fromJS(url, globalThis).dupeRef();
if (!req.url.isEmpty())
fields.insert(.url);
@@ -599,7 +600,7 @@ pub const Request = struct {
_ = req.body.unref();
return null;
};
- req.url = str;
+ req.url = str.dupeRef();
if (!req.url.isEmpty())
fields.insert(.url);
}
@@ -648,9 +649,10 @@ pub const Request = struct {
return null;
}
- // Note that the string is going to be ref'd here, so we don't need to ref it above.
const href = JSC.URL.hrefFromString(req.url);
if (href.isEmpty()) {
+ // globalThis.throw can cause GC, which could cause the above string to be freed.
+ // so we must increment the reference count before calling it.
globalThis.throw("Failed to construct 'Request': Invalid URL \"{}\"", .{
req.url,
});
@@ -658,6 +660,14 @@ pub const Request = struct {
_ = req.body.unref();
return null;
}
+
+ // hrefFromString increments the reference count if they end up being
+ // the same
+ //
+ // we increment the reference count on usage above, so we must
+ // decrement it to be perfectly balanced.
+ req.url.deref();
+
req.url = href;
if (req.body.value == .Blob and
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 01ecfad36..d947a7d4e 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -617,6 +617,7 @@ pub const Fetch = struct {
http: ?*HTTPClient.AsyncHTTP = null,
result: HTTPClient.HTTPClientResult = .{},
+ metadata: ?HTTPClient.HTTPClientResult.ResultMetadata = .{},
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
request_body: HTTPRequestBody = undefined,
@@ -641,7 +642,8 @@ pub const Fetch = struct {
url_proxy_buffer: []const u8 = "",
signal: ?*JSC.WebCore.AbortSignal = null,
- aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+ signals: HTTPClient.Signals = .{},
+ signal_store: HTTPClient.Signals.Store = .{},
has_schedule_callback: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
// must be stored because AbortSignal stores reason weakly
@@ -702,11 +704,19 @@ pub const Fetch = struct {
this.request_headers.entries.deinit(bun.default_allocator);
this.request_headers.buf.deinit(bun.default_allocator);
this.request_headers = Headers{ .allocator = undefined };
- this.http.?.clearData();
- this.result.deinitMetadata();
+ if (this.http != null) {
+ this.http.?.clearData();
+ }
+
+ if (this.metadata != null) {
+ this.metadata.?.deinit();
+ this.metadata = null;
+ }
+
this.response_buffer.deinit();
this.response.deinit();
+
this.scheduled_response_buffer.deinit();
this.request_body.detach();
@@ -725,9 +735,13 @@ pub const Fetch = struct {
}
pub fn onBodyReceived(this: *FetchTasklet) void {
+ this.mutex.lock();
const success = this.result.isSuccess();
const globalThis = this.global_this;
defer {
+ this.has_schedule_callback.store(false, .Monotonic);
+ this.mutex.unlock();
+
if (!success or !this.result.has_more) {
var vm = globalThis.bunVM();
this.poll_ref.unref(vm);
@@ -831,43 +845,42 @@ pub const Fetch = struct {
pub fn onProgressUpdate(this: *FetchTasklet) void {
JSC.markBinding(@src());
- this.mutex.lock();
- defer {
- this.has_schedule_callback.store(false, .Monotonic);
- this.mutex.unlock();
- }
-
if (this.is_waiting_body) {
return this.onBodyReceived();
}
+ this.mutex.lock();
const globalThis = this.global_this;
var ref = this.promise;
const promise_value = ref.value();
- defer ref.strong.deinit();
var poll_ref = this.poll_ref;
var vm = globalThis.bunVM();
if (promise_value.isEmptyOrUndefinedOrNull()) {
+ ref.strong.deinit();
+ this.has_schedule_callback.store(false, .Monotonic);
+ this.mutex.unlock();
poll_ref.unref(vm);
this.clearData();
this.deinit();
return;
}
+ const promise = promise_value.asAnyPromise().?;
+ const tracker = this.tracker;
+ tracker.willDispatch(globalThis);
defer {
+ tracker.didDispatch(globalThis);
+ ref.strong.deinit();
+ this.has_schedule_callback.store(false, .Monotonic);
+ this.mutex.unlock();
if (!this.is_waiting_body) {
poll_ref.unref(vm);
this.clearData();
this.deinit();
}
}
-
- const promise = promise_value.asAnyPromise().?;
- const tracker = this.tracker;
- tracker.willDispatch(globalThis);
- defer tracker.didDispatch(globalThis);
const success = this.result.isSuccess();
const result = switch (success) {
true => this.onResolve(),
@@ -907,6 +920,16 @@ pub const Fetch = struct {
return JSC.WebCore.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this);
}
+ var path: bun.String = undefined;
+
+ if (this.metadata) |metadata| {
+ path = bun.String.create(metadata.href);
+ } else if (this.http) |http| {
+ path = bun.String.create(http.url.href);
+ } else {
+ path = bun.String.empty;
+ }
+
const fetch_error = JSC.SystemError{
.code = bun.String.static(@errorName(this.result.fail)),
.message = switch (this.result.fail) {
@@ -916,7 +939,7 @@ pub const Fetch = struct {
error.ConnectionRefused => bun.String.static("Unable to connect. Is the computer able to access the url?"),
else => bun.String.static("fetch() failed. For more information, pass `verbose: true` in the second argument to fetch()"),
},
- .path = bun.String.create(this.http.?.url.href),
+ .path = path,
};
return fetch_error.toErrorInstance(this.global_this);
@@ -927,7 +950,7 @@ pub const Fetch = struct {
if (this.http) |http| {
http.enableBodyStreaming();
}
- if (this.aborted.load(.Acquire)) {
+ if (this.signal_store.aborted.load(.Monotonic)) {
return JSC.WebCore.DrainResult{
.aborted = {},
};
@@ -1000,21 +1023,27 @@ pub const Fetch = struct {
}
fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response {
- const http_response = this.result.response;
- this.is_waiting_body = this.result.has_more;
- return Response{
- .allocator = allocator,
- .url = bun.String.createAtomIfPossible(this.result.href),
- .status_text = bun.String.createAtomIfPossible(http_response.status),
- .redirected = this.result.redirected,
- .body = .{
- .init = .{
- .headers = FetchHeaders.createFromPicoHeaders(http_response.headers),
- .status_code = @as(u16, @truncate(http_response.status_code)),
+ // at this point we always should have metadata
+ std.debug.assert(this.metadata != null);
+ if (this.metadata) |metadata| {
+ const http_response = metadata.response;
+ this.is_waiting_body = this.result.has_more;
+ return Response{
+ .allocator = allocator,
+ .url = bun.String.createAtomIfPossible(metadata.href),
+ .status_text = bun.String.createAtomIfPossible(http_response.status),
+ .redirected = this.result.redirected,
+ .body = .{
+ .init = .{
+ .headers = FetchHeaders.createFromPicoHeaders(http_response.headers),
+ .status_code = @as(u16, @truncate(http_response.status_code)),
+ },
+ .value = this.toBodyValue(),
},
- .value = this.toBodyValue(),
- },
- };
+ };
+ }
+
+ @panic("fetch metadata should be provided");
}
pub fn onResolve(this: *FetchTasklet) JSValue {
@@ -1063,6 +1092,7 @@ pub const Fetch = struct {
.hostname = fetch_options.hostname,
.tracker = JSC.AsyncTaskTracker.init(jsc_vm),
};
+ fetch_tasklet.signals = fetch_tasklet.signal_store.to();
fetch_tasklet.tracker.didSchedule(globalThis);
@@ -1079,6 +1109,10 @@ pub const Fetch = struct {
proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url);
}
+ if (fetch_tasklet.signal == null) {
+ fetch_tasklet.signals.aborted = null;
+ }
+
fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(
allocator,
fetch_options.method,
@@ -1095,9 +1129,10 @@ pub const Fetch = struct {
fetch_tasklet,
),
proxy,
- if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null,
+
fetch_options.hostname,
fetch_options.redirect_type,
+ fetch_tasklet.signals,
);
if (fetch_options.redirect_type != FetchRedirect.follow) {
@@ -1108,7 +1143,7 @@ pub const Fetch = struct {
fetch_tasklet.http.?.client.verbose = fetch_options.verbose;
fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive;
// we wanna to return after headers are received
- fetch_tasklet.http.?.signalHeaderProgress();
+ fetch_tasklet.signal_store.header_progress.store(true, .Monotonic);
if (fetch_tasklet.request_body == .Sendfile) {
std.debug.assert(fetch_options.url.isHTTP());
@@ -1127,7 +1162,7 @@ pub const Fetch = struct {
reason.ensureStillAlive();
this.abort_reason = reason;
reason.protect();
- this.aborted.store(true, .Monotonic);
+ this.signal_store.aborted.store(true, .Monotonic);
this.tracker.didCancel(this.global_this);
if (this.http != null) {
@@ -1180,11 +1215,14 @@ pub const Fetch = struct {
task.mutex.lock();
defer task.mutex.unlock();
task.result = result;
+ // metadata should be provided only once so we preserve it until we consume it
+ if (result.metadata) |metadata| {
+ task.metadata = metadata;
+ }
task.body_size = result.body_size;
const success = result.isSuccess();
task.response_buffer = result.body.?.*;
-
if (success) {
_ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch @panic("OOM");
}
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 955d10ffb..771d34db0 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -1915,6 +1915,34 @@ pub const ArrayBufferSink = struct {
pub const JSSink = NewJSSink(@This(), "ArrayBufferSink");
};
+const AutoFlusher = struct {
+ registered: bool = false,
+
+ pub fn registerDeferredMicrotaskWithType(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void {
+ if (this.auto_flusher.registered) return;
+ this.auto_flusher.registered = true;
+ std.debug.assert(!vm.eventLoop().registerDeferredTask(this, @ptrCast(&Type.onAutoFlush)));
+ }
+
+ pub fn unregisterDeferredMicrotaskWithType(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void {
+ if (!this.auto_flusher.registered) return;
+ this.auto_flusher.registered = false;
+ std.debug.assert(vm.eventLoop().unregisterDeferredTask(this));
+ }
+
+ pub fn unregisterDeferredMicrotaskWithTypeUnchecked(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void {
+ std.debug.assert(this.auto_flusher.registered);
+ std.debug.assert(vm.eventLoop().unregisterDeferredTask(this));
+ this.auto_flusher.registered = false;
+ }
+
+ pub fn registerDeferredMicrotaskWithTypeUnchecked(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void {
+ std.debug.assert(!this.auto_flusher.registered);
+ this.auto_flusher.registered = true;
+ std.debug.assert(!vm.eventLoop().registerDeferredTask(this, @ptrCast(&Type.onAutoFlush)));
+ }
+};
+
pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
return struct {
sink: SinkType,
@@ -2357,6 +2385,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
end_len: usize = 0,
aborted: bool = false,
+ onFirstWrite: ?*const fn (?*anyopaque) void = null,
+ ctx: ?*anyopaque = null,
+
+ auto_flusher: AutoFlusher = AutoFlusher{},
+
const log = Output.scoped(.HTTPServerWritable, false);
pub fn connect(this: *@This(), signal: Signal) void {
@@ -2375,15 +2408,25 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
}
+ fn handleFirstWriteIfNecessary(this: *@This()) void {
+ if (this.onFirstWrite) |onFirstWrite| {
+ var ctx = this.ctx;
+ this.ctx = null;
+ this.onFirstWrite = null;
+ onFirstWrite(ctx);
+ }
+ }
+
fn hasBackpressure(this: *const @This()) bool {
return this.has_backpressure;
}
- fn send(this: *@This(), buf: []const u8) bool {
+ fn sendWithoutAutoFlusher(this: *@This(), buf: []const u8) bool {
std.debug.assert(!this.done);
defer log("send: {d} bytes (backpressure: {any})", .{ buf.len, this.has_backpressure });
if (this.requested_end and !this.res.state().isHttpWriteCalled()) {
+ this.handleFirstWriteIfNecessary();
const success = this.res.tryEnd(buf, this.end_len, false);
this.has_backpressure = !success;
return success;
@@ -2395,10 +2438,12 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
// so in this scenario, we just append to the buffer
// and report success
if (this.requested_end) {
+ this.handleFirstWriteIfNecessary();
this.res.end(buf, false);
this.has_backpressure = false;
return true;
} else {
+ this.handleFirstWriteIfNecessary();
this.has_backpressure = !this.res.write(buf);
if (this.has_backpressure) {
this.res.onWritable(*@This(), onWritable, this);
@@ -2409,6 +2454,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
unreachable;
}
+ fn send(this: *@This(), buf: []const u8) bool {
+ this.unregisterAutoFlusher();
+ return this.sendWithoutAutoFlusher(buf);
+ }
+
fn readableSlice(this: *@This()) []const u8 {
return this.buffer.ptr[this.offset..this.buffer.cap][0..this.buffer.len];
}
@@ -2464,7 +2514,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn start(this: *@This(), stream_start: StreamStart) JSC.Node.Maybe(void) {
if (this.aborted or this.res.hasResponded()) {
- this.done = true;
+ this.markDone();
this.signal.close(null);
return .{ .result = {} };
}
@@ -2529,6 +2579,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn flushFromJS(this: *@This(), globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) {
log("flushFromJS({any})", .{wait});
+ this.unregisterAutoFlusher();
+
if (!wait) {
return this.flushFromJSNoWait();
}
@@ -2563,12 +2615,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn flush(this: *@This()) JSC.Node.Maybe(void) {
log("flush()", .{});
+ this.unregisterAutoFlusher();
+
if (!this.hasBackpressure() or this.done) {
return .{ .result = {} };
}
if (this.res.hasResponded()) {
- this.done = true;
+ this.markDone();
this.signal.close(null);
}
@@ -2596,6 +2650,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
+ this.registerAutoFlusher();
} else if (this.buffer.len + len >= this.highWaterMark) {
// TODO: attempt to write both in a corked buffer?
_ = this.buffer.write(this.allocator, bytes) catch {
@@ -2613,9 +2668,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
+ this.registerAutoFlusher();
return .{ .owned = len };
}
+ this.registerAutoFlusher();
this.res.onWritable(*@This(), onWritable, this);
return .{ .owned = len };
@@ -2628,7 +2685,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.res.hasResponded()) {
this.signal.close(null);
- this.done = true;
+ this.markDone();
return .{ .done = {} };
}
@@ -2676,9 +2733,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
_ = this.buffer.writeLatin1(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
+ this.registerAutoFlusher();
return .{ .owned = len };
}
+ this.registerAutoFlusher();
this.res.onWritable(*@This(), onWritable, this);
return .{ .owned = len };
@@ -2690,7 +2749,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.res.hasResponded()) {
this.signal.close(null);
- this.done = true;
+ this.markDone();
return .{ .done = {} };
}
@@ -2715,9 +2774,15 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.res.onWritable(*@This(), onWritable, this);
}
+ this.registerAutoFlusher();
return .{ .owned = @as(Blob.SizeType, @intCast(written)) };
}
+ pub fn markDone(this: *@This()) void {
+ this.done = true;
+ this.unregisterAutoFlusher();
+ }
+
// In this case, it's always an error
pub fn end(this: *@This(), err: ?Syscall.Error) JSC.Node.Maybe(void) {
log("end({any})", .{err});
@@ -2728,7 +2793,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.done or this.res.hasResponded()) {
this.signal.close(err);
- this.done = true;
+ this.markDone();
this.finalize();
return .{ .result = {} };
}
@@ -2739,7 +2804,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (readable.len == 0) {
this.signal.close(err);
- this.done = true;
+ this.markDone();
// we do not close the stream here
// this.res.endStream(false);
this.finalize();
@@ -2759,7 +2824,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.done or this.res.hasResponded()) {
this.requested_end = true;
this.signal.close(null);
- this.done = true;
+ this.markDone();
this.finalize();
return .{ .result = JSC.JSValue.jsNumber(0) };
}
@@ -2780,10 +2845,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.res.end("", false);
}
- this.done = true;
+ this.markDone();
this.flushPromise();
this.signal.close(null);
- this.done = true;
this.finalize();
return .{ .result = JSC.JSValue.jsNumber(this.wrote) };
@@ -2796,12 +2860,50 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn abort(this: *@This()) void {
log("onAborted()", .{});
this.done = true;
+ this.unregisterAutoFlusher();
+
this.aborted = true;
this.signal.close(null);
+
this.flushPromise();
this.finalize();
}
+ fn unregisterAutoFlusher(this: *@This()) void {
+ if (this.auto_flusher.registered)
+ AutoFlusher.unregisterDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM());
+ }
+
+ fn registerAutoFlusher(this: *@This()) void {
+ if (!this.auto_flusher.registered)
+ AutoFlusher.registerDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM());
+ }
+
+ pub fn onAutoFlush(this: *@This()) bool {
+ log("onAutoFlush()", .{});
+ if (this.done) {
+ this.auto_flusher.registered = false;
+ return false;
+ }
+
+ const readable = this.readableSlice();
+
+ if (this.hasBackpressure() or readable.len == 0) {
+ this.auto_flusher.registered = false;
+ return false;
+ }
+
+ if (!this.sendWithoutAutoFlusher(readable)) {
+ this.auto_flusher.registered = true;
+ this.res.onWritable(*@This(), onWritable, this);
+ return true;
+ }
+
+ this.handleWrote(readable.len);
+ this.auto_flusher.registered = false;
+ return false;
+ }
+
pub fn destroy(this: *@This()) void {
log("destroy()", .{});
var bytes = this.buffer.listManaged(this.allocator);
@@ -2810,6 +2912,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
bytes.deinit();
}
+ this.unregisterAutoFlusher();
+
this.allocator.destroy(this);
}
@@ -2820,6 +2924,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (!this.done) {
this.done = true;
+ this.unregisterAutoFlusher();
this.res.endStream(false);
}
diff --git a/src/cli/test_command.zig b/src/cli/test_command.zig
index 5bf48d7d9..53dd4c3c5 100644
--- a/src/cli/test_command.zig
+++ b/src/cli/test_command.zig
@@ -955,12 +955,12 @@ pub const TestCommand = struct {
}
{
- vm.global.vm().drainMicrotasks();
+ vm.drainMicrotasks();
var count = vm.unhandled_error_counter;
vm.global.handleRejectedPromises();
while (vm.unhandled_error_counter > count) {
count = vm.unhandled_error_counter;
- vm.global.vm().drainMicrotasks();
+ vm.drainMicrotasks();
vm.global.handleRejectedPromises();
}
}
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 725e960d6..26978db22 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -60,6 +60,31 @@ var shared_response_headers_buf: [256]picohttp.Header = undefined;
const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n";
+pub const Signals = struct {
+ header_progress: ?*std.atomic.Atomic(bool) = null,
+ body_streaming: ?*std.atomic.Atomic(bool) = null,
+ aborted: ?*std.atomic.Atomic(bool) = null,
+
+ pub const Store = struct {
+ header_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+ body_streaming: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+ aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+
+ pub fn to(this: *Store) Signals {
+ return .{
+ .header_progress = &this.header_progress,
+ .body_streaming = &this.body_streaming,
+ .aborted = &this.aborted,
+ };
+ }
+ };
+
+ pub fn get(this: Signals, comptime field: std.meta.FieldEnum(Signals)) bool {
+ var ptr: *std.atomic.Atomic(bool) = @field(this, @tagName(field)) orelse return false;
+ return ptr.load(.Monotonic);
+ }
+};
+
pub const FetchRedirect = enum(u8) {
follow,
manual,
@@ -761,12 +786,12 @@ pub fn onOpen(
std.debug.assert(is_ssl == client.url.isHTTPS());
}
}
- if (client.aborted != null) {
+ if (client.signals.aborted != null) {
socket_async_http_abort_tracker.put(client.async_http_id, socket.socket) catch unreachable;
}
log("Connected {s} \n", .{client.url.href});
- if (client.hasSignalAborted()) {
+ if (client.signals.get(.aborted)) {
client.closeAndAbort(comptime is_ssl, socket);
return;
}
@@ -1012,6 +1037,7 @@ pub const InternalState = struct {
fail: anyerror = error.NoError,
request_stage: HTTPStage = .pending,
response_stage: HTTPStage = .pending,
+ metadata_sent: bool = false,
pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState {
return .{
@@ -1153,13 +1179,17 @@ pub const InternalState = struct {
}
pub fn postProcessBody(this: *InternalState) usize {
- var response = &this.pending_response;
- // if it compressed with this header, it is no longer
- if (this.content_encoding_i < response.headers.len) {
- var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len };
- _ = mutable_headers.orderedRemove(this.content_encoding_i);
- response.headers = mutable_headers.items;
- this.content_encoding_i = std.math.maxInt(@TypeOf(this.content_encoding_i));
+
+ // we only touch it if we did not sent the headers yet
+ if (!this.metadata_sent) {
+ var response = &this.pending_response;
+ if (this.content_encoding_i < response.headers.len) {
+ // if it compressed with this header, it is no longer
+ var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len };
+ _ = mutable_headers.orderedRemove(this.content_encoding_i);
+ response.headers = mutable_headers.items;
+ this.content_encoding_i = std.math.maxInt(@TypeOf(this.content_encoding_i));
+ }
}
return this.body_out_str.?.list.items.len;
@@ -1201,11 +1231,9 @@ http_proxy: ?URL = null,
proxy_authorization: ?[]u8 = null,
proxy_tunneling: bool = false,
proxy_tunnel: ?ProxyTunnel = null,
-aborted: ?*std.atomic.Atomic(bool) = null,
+signals: Signals = .{},
async_http_id: u32 = 0,
hostname: ?[]u8 = null,
-signal_header_progress: *std.atomic.Atomic(bool),
-enable_body_stream: *std.atomic.Atomic(bool),
pub fn init(
allocator: std.mem.Allocator,
@@ -1213,10 +1241,8 @@ pub fn init(
url: URL,
header_entries: Headers.Entries,
header_buf: string,
- signal: ?*std.atomic.Atomic(bool),
hostname: ?[]u8,
- signal_header_progress: *std.atomic.Atomic(bool),
- enable_body_stream: *std.atomic.Atomic(bool),
+ signals: Signals,
) HTTPClient {
return HTTPClient{
.allocator = allocator,
@@ -1224,10 +1250,8 @@ pub fn init(
.url = url,
.header_entries = header_entries,
.header_buf = header_buf,
- .aborted = signal,
.hostname = hostname,
- .signal_header_progress = signal_header_progress,
- .enable_body_stream = enable_body_stream,
+ .signals = signals,
};
}
@@ -1384,8 +1408,7 @@ pub const AsyncHTTP = struct {
elapsed: u64 = 0,
gzip_elapsed: u64 = 0,
- signal_header_progress: std.atomic.Atomic(bool),
- enable_body_stream: std.atomic.Atomic(bool),
+ signals: Signals = .{},
pub var active_requests_count = std.atomic.Atomic(usize).init(0);
pub var max_simultaneous_requests = std.atomic.Atomic(usize).init(256);
@@ -1418,12 +1441,14 @@ pub const AsyncHTTP = struct {
pub fn signalHeaderProgress(this: *AsyncHTTP) void {
@fence(.Release);
- this.client.signal_header_progress.store(true, .Release);
+ var progress = this.signals.header_progress orelse return;
+ progress.store(true, .Release);
}
pub fn enableBodyStreaming(this: *AsyncHTTP) void {
@fence(.Release);
- this.client.enable_body_stream.store(true, .Release);
+ var stream = this.signals.body_streaming orelse return;
+ stream.store(true, .Release);
}
pub fn clearData(this: *AsyncHTTP) void {
@@ -1453,9 +1478,9 @@ pub const AsyncHTTP = struct {
timeout: usize,
callback: HTTPClientResult.Callback,
http_proxy: ?URL,
- signal: ?*std.atomic.Atomic(bool),
hostname: ?[]u8,
redirect_type: FetchRedirect,
+ signals: ?Signals,
) AsyncHTTP {
var this = AsyncHTTP{
.allocator = allocator,
@@ -1467,12 +1492,11 @@ pub const AsyncHTTP = struct {
.response_buffer = response_buffer,
.result_callback = callback,
.http_proxy = http_proxy,
- .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0,
- .signal_header_progress = std.atomic.Atomic(bool).init(false),
- .enable_body_stream = std.atomic.Atomic(bool).init(false),
+ .signals = signals orelse .{},
+ .async_http_id = if (signals != null and signals.?.aborted != null) async_http_id.fetchAdd(1, .Monotonic) else 0,
};
- this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname, &this.signal_header_progress, &this.enable_body_stream);
+ this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, hostname, signals orelse this.signals);
this.client.async_http_id = this.async_http_id;
this.client.timeout = timeout;
this.client.http_proxy = this.http_proxy;
@@ -1544,7 +1568,21 @@ pub const AsyncHTTP = struct {
}
pub fn initSync(allocator: std.mem.Allocator, method: Method, url: URL, headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, request_body: []const u8, timeout: usize, http_proxy: ?URL, hostname: ?[]u8, redirect_type: FetchRedirect) AsyncHTTP {
- return @This().init(allocator, method, url, headers, headers_buf, response_buffer, request_body, timeout, undefined, http_proxy, null, hostname, redirect_type);
+ return @This().init(
+ allocator,
+ method,
+ url,
+ headers,
+ headers_buf,
+ response_buffer,
+ request_body,
+ timeout,
+ undefined,
+ http_proxy,
+ hostname,
+ redirect_type,
+ null,
+ );
}
fn reset(this: *AsyncHTTP) !void {
@@ -1646,8 +1684,10 @@ pub const AsyncHTTP = struct {
if (!result.isSuccess()) {
return result.fail;
}
-
- return result.response;
+ std.debug.assert(result.metadata != null);
+ if (result.metadata) |metadata| {
+ return metadata.response;
+ }
}
unreachable;
@@ -1659,33 +1699,34 @@ pub const AsyncHTTP = struct {
var callback = this.result_callback;
this.elapsed = http_thread.timer.read() -| this.elapsed;
this.redirected = this.client.remaining_redirect_count != default_redirect_count;
- if (!result.isSuccess()) {
+ if (result.isSuccess()) {
+ this.err = null;
+ if (result.metadata) |metadata| {
+ this.response = metadata.response;
+ }
+ this.state.store(.success, .Monotonic);
+ } else {
this.err = result.fail;
this.response = null;
this.state.store(State.fail, .Monotonic);
- } else {
- this.err = null;
- this.response = result.response;
- this.state.store(.success, .Monotonic);
}
if (result.has_more) {
callback.function(callback.ctx, result);
} else {
- this.client.deinit();
-
- this.real.?.* = this.*;
- this.real.?.response_buffer = this.response_buffer;
-
- log("onAsyncHTTPCallback: {any}", .{bun.fmt.fmtDuration(this.elapsed)});
+ {
+ this.client.deinit();
+ defer default_allocator.destroy(this);
+ this.real.?.* = this.*;
+ this.real.?.response_buffer = this.response_buffer;
- default_allocator.destroy(this);
+ log("onAsyncHTTPCallback: {any}", .{bun.fmt.fmtDuration(this.elapsed)});
+ callback.function(callback.ctx, result);
+ }
const active_requests = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic);
std.debug.assert(active_requests > 0);
- callback.function(callback.ctx, result);
-
if (active_requests >= AsyncHTTP.max_simultaneous_requests.load(.Monotonic)) {
http_thread.drainEvents();
}
@@ -1715,10 +1756,6 @@ pub const AsyncHTTP = struct {
}
};
-pub fn hasSignalAborted(this: *const HTTPClient) bool {
- return (this.aborted orelse return false).load(.Monotonic);
-}
-
pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request {
var header_count: usize = 0;
var header_entries = this.header_entries.slice();
@@ -1842,7 +1879,7 @@ pub fn doRedirect(this: *HTTPClient) void {
tunnel.deinit();
this.proxy_tunnel = null;
}
- if (this.aborted != null) {
+ if (this.signals.aborted != null) {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
return this.start(.{ .bytes = "" }, body_out_str);
@@ -1874,7 +1911,7 @@ pub fn start(this: *HTTPClient, body: HTTPRequestBody, body_out_str: *MutableStr
fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
// Aborted before connecting
- if (this.hasSignalAborted()) {
+ if (this.signals.get(.aborted)) {
this.fail(error.Aborted);
return;
}
@@ -1912,7 +1949,7 @@ fn printResponse(response: picohttp.Response) void {
}
pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
- if (this.hasSignalAborted()) {
+ if (this.signals.get(.aborted)) {
this.closeAndAbort(is_ssl, socket);
return;
}
@@ -2256,7 +2293,7 @@ fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTP
pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void {
log("onData {}", .{incoming_data.len});
- if (this.hasSignalAborted()) {
+ if (this.signals.get(.aborted)) {
this.closeAndAbort(is_ssl, socket);
return;
}
@@ -2359,7 +2396,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
if (body_buf.len == 0) {
// no body data yet, but we can report the headers
- if (this.signal_header_progress.load(.Acquire)) {
+ if (this.signals.get(.header_progress)) {
this.progressUpdate(is_ssl, ctx, socket);
}
return;
@@ -2393,7 +2430,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
}
// if not reported we report partially now
- if (this.signal_header_progress.load(.Acquire)) {
+ if (this.signals.get(.header_progress)) {
this.progressUpdate(is_ssl, ctx, socket);
return;
}
@@ -2512,7 +2549,7 @@ pub fn closeAndAbort(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPCo
}
fn fail(this: *HTTPClient, err: anyerror) void {
- if (this.aborted != null) {
+ if (this.signals.aborted != null) {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
this.state.request_stage = .fail;
@@ -2561,7 +2598,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
if (this.state.stage != .done and this.state.stage != .fail) {
const is_done = this.state.isDone();
- if (this.aborted != null and is_done) {
+ if (this.signals.aborted != null and is_done) {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
@@ -2585,22 +2622,17 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
} else if (!socket.isClosed()) {
socket.close(0, null);
}
+
this.state.reset();
this.state.response_stage = .done;
this.state.request_stage = .done;
this.state.stage = .done;
this.proxy_tunneling = false;
- if (comptime print_every > 0) {
- print_every_i += 1;
- if (print_every_i % print_every == 0) {
- Output.prettyln("Heap stats for HTTP thread\n", .{});
- Output.flush();
- default_arena.dumpThreadStats();
- print_every_i = 0;
- }
- }
}
+
result.body.?.* = body;
+ callback.run(result);
+
if (comptime print_every > 0) {
print_every_i += 1;
if (print_every_i % print_every == 0) {
@@ -2610,25 +2642,39 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
print_every_i = 0;
}
}
- callback.run(result);
}
}
pub const HTTPClientResult = struct {
body: ?*MutableString = null,
- response: picohttp.Response = .{},
- metadata_buf: []u8 = &.{},
- href: []const u8 = "",
- fail: anyerror = error.NoError,
- redirected: bool = false,
- headers_buf: []picohttp.Header = &.{},
has_more: bool = false,
+ fail: anyerror = error.NoError,
+
+ metadata: ?ResultMetadata = null,
/// For Http Client requests
/// when Content-Length is provided this represents the whole size of the request
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
/// If is not chunked encoded and Content-Length is not provided this will be unknown
body_size: BodySize = .unknown,
+ redirected: bool = false,
+
+ pub const ResultMetadata = struct {
+ response: picohttp.Response = .{},
+ metadata_buf: []u8 = &.{},
+ href: []const u8 = "",
+ headers_buf: []picohttp.Header = &.{},
+
+ pub fn deinit(this: *ResultMetadata) void {
+ if (this.metadata_buf.len > 0) bun.default_allocator.free(this.metadata_buf);
+ if (this.headers_buf.len > 0) bun.default_allocator.free(this.headers_buf);
+ this.headers_buf = &.{};
+ this.metadata_buf = &.{};
+ this.href = "";
+ this.response.headers = &.{};
+ this.response.status = "";
+ }
+ };
pub const BodySize = union(enum) {
total_received: usize,
@@ -2648,17 +2694,6 @@ pub const HTTPClientResult = struct {
return this.fail == error.Aborted;
}
- pub fn deinitMetadata(this: *HTTPClientResult) void {
- if (this.metadata_buf.len > 0) bun.default_allocator.free(this.metadata_buf);
- if (this.headers_buf.len > 0) bun.default_allocator.free(this.headers_buf);
-
- this.headers_buf = &.{};
- this.metadata_buf = &.{};
- this.href = "";
- this.response.headers = &.{};
- this.response.status = "";
- }
-
pub const Callback = struct {
ctx: *anyopaque,
function: Function,
@@ -2694,14 +2729,26 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientRes
.{ .content_length = content_length }
else
.{ .unknown = {} };
+ if (!this.state.metadata_sent) {
+ this.state.metadata_sent = true;
+ return HTTPClientResult{
+ .metadata = .{
+ .response = metadata.response,
+ .metadata_buf = metadata.owned_buf,
+ .href = metadata.url,
+ .headers_buf = metadata.response.headers,
+ },
+ .body = this.state.body_out_str,
+ .redirected = this.remaining_redirect_count != default_redirect_count,
+ .fail = this.state.fail,
+ .has_more = this.state.fail == error.NoError and !this.state.isDone(),
+ .body_size = body_size,
+ };
+ }
return HTTPClientResult{
.body = this.state.body_out_str,
- .response = metadata.response,
- .metadata_buf = metadata.owned_buf,
- .redirected = this.remaining_redirect_count != default_redirect_count,
- .href = metadata.url,
+ .metadata = null,
.fail = this.state.fail,
- .headers_buf = metadata.response.headers,
.has_more = this.state.fail == error.NoError and !this.state.isDone(),
.body_size = body_size,
};
@@ -2786,7 +2833,7 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con
// done or streaming
const is_done = this.state.total_body_received >= content_length;
- if (is_done or this.enable_body_stream.load(.Acquire)) {
+ if (is_done or this.signals.get(.body_streaming)) {
const processed = try this.state.processBodyBuffer(buffer.*);
if (this.progress_node) |progress| {
@@ -2848,7 +2895,7 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets(
progress.context.maybeRefresh();
}
// streaming chunks
- if (this.enable_body_stream.load(.Acquire)) {
+ if (this.signals.get(.body_streaming)) {
const processed = try this.state.processBodyBuffer(buffer);
return processed > 0;
}
@@ -2927,7 +2974,7 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket(
try body_buffer.appendSliceExact(buffer);
// streaming chunks
- if (this.enable_body_stream.load(.Acquire)) {
+ if (this.signals.get(.body_streaming)) {
const processed = try this.state.processBodyBuffer(body_buffer.*);
return processed > 0;
}
diff --git a/src/install/install.zig b/src/install/install.zig
index d444b62fc..b0cdf35c8 100644
--- a/src/install/install.zig
+++ b/src/install/install.zig
@@ -370,8 +370,8 @@ const NetworkTask = struct {
this.getCompletionCallback(),
this.package_manager.httpProxy(url),
null,
- null,
HTTP.FetchRedirect.follow,
+ null,
);
this.callback = .{
.package_manifest = .{
@@ -448,8 +448,8 @@ const NetworkTask = struct {
this.getCompletionCallback(),
this.package_manager.httpProxy(url),
null,
- null,
HTTP.FetchRedirect.follow,
+ null,
);
this.callback = .{ .extract = tarball };
}
diff --git a/src/js/builtins/ReadableStreamDefaultReader.ts b/src/js/builtins/ReadableStreamDefaultReader.ts
index ea1a64b68..169806c52 100644
--- a/src/js/builtins/ReadableStreamDefaultReader.ts
+++ b/src/js/builtins/ReadableStreamDefaultReader.ts
@@ -104,9 +104,11 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
if ($getByIdDirectPrivate(controller, "closeRequested"))
$readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
- else if ($isReadableStreamDefaultController(controller))
+ else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
- else if ($isReadableByteStreamController(controller)) $readableByteStreamControllerCallPullIfNeeded(controller);
+ } else if ($isReadableByteStreamController(controller)) {
+ $readableByteStreamControllerCallPullIfNeeded(controller);
+ }
return { value: outValues, size, done: false };
}
@@ -138,11 +140,13 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
var size = queue.size;
$resetQueue(queue);
- if ($getByIdDirectPrivate(controller, "closeRequested"))
+ if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
- else if ($isReadableStreamDefaultController(controller))
+ } else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
- else if ($isReadableByteStreamController(controller)) $readableByteStreamControllerCallPullIfNeeded(controller);
+ } else if ($isReadableByteStreamController(controller)) {
+ $readableByteStreamControllerCallPullIfNeeded(controller);
+ }
return { value: value, size: size, done: false };
};
diff --git a/src/js/node/async_hooks.ts b/src/js/node/async_hooks.ts
index 2a671b6a2..d04b226f8 100644
--- a/src/js/node/async_hooks.ts
+++ b/src/js/node/async_hooks.ts
@@ -21,7 +21,7 @@
// AsyncContextData is an immutable array managed in here, formatted [key, value, key, value] where
// each key is an AsyncLocalStorage object and the value is the associated value.
//
-const { cleanupLater } = $lazy("async_hooks");
+const { cleanupLater, setAsyncHooksEnabled } = $lazy("async_hooks");
function get(): ReadonlyArray<any> | undefined {
return $getInternalField($asyncContext, 0);
@@ -34,7 +34,9 @@ function set(contextValue: ReadonlyArray<any> | undefined) {
class AsyncLocalStorage {
#disableCalled = false;
- constructor() {}
+ constructor() {
+ setAsyncHooksEnabled(true);
+ }
static bind(fn, ...args: any) {
return this.snapshot().bind(null, fn, ...args);
@@ -160,6 +162,7 @@ class AsyncResource {
if (typeof type !== "string") {
throw new TypeError('The "type" argument must be of type string. Received type ' + typeof type);
}
+ setAsyncHooksEnabled(true);
this.type = type;
this.#snapshot = get();
}
diff --git a/src/js/out/InternalModuleRegistryConstants.h b/src/js/out/InternalModuleRegistryConstants.h
index 20a7693b1..dea913eb4 100644
--- a/src/js/out/InternalModuleRegistryConstants.h
+++ b/src/js/out/InternalModuleRegistryConstants.h
@@ -30,7 +30,7 @@ static constexpr ASCIILiteral NodeAssertStrictCode = "(function (){\"use strict\
//
//
-static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s;
+static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater, setAsyncHooksEnabled } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n setAsyncHooksEnabled(!0);\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n setAsyncHooksEnabled(!0), this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s;
//
//
@@ -263,7 +263,7 @@ static constexpr ASCIILiteral NodeAssertStrictCode = "(function (){\"use strict\
//
//
-static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s;
+static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater, setAsyncHooksEnabled } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n setAsyncHooksEnabled(!0);\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n setAsyncHooksEnabled(!0), this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s;
//
//
@@ -497,7 +497,7 @@ static constexpr ASCIILiteral NodeAssertStrictCode = "(function (){\"use strict\
//
//
-static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s;
+static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater, setAsyncHooksEnabled } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n setAsyncHooksEnabled(!0);\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n setAsyncHooksEnabled(!0), this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s;
//
//