aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js')
m---------src/bun.js/WebKit0
-rw-r--r--src/bun.js/api/server.zig295
-rw-r--r--src/bun.js/bindings/AsyncContextFrame.cpp8
-rw-r--r--src/bun.js/bindings/ZigGlobalObject.cpp14
-rw-r--r--src/bun.js/event_loop.zig49
-rw-r--r--src/bun.js/javascript.zig4
-rw-r--r--src/bun.js/webcore/body.zig19
-rw-r--r--src/bun.js/webcore/request.zig20
-rw-r--r--src/bun.js/webcore/response.zig108
-rw-r--r--src/bun.js/webcore/streams.zig125
10 files changed, 478 insertions, 164 deletions
diff --git a/src/bun.js/WebKit b/src/bun.js/WebKit
-Subproject fd79ce3120a692f4aed314c3da3dd452b4aa865
+Subproject 48c1316e907ca597e27e5a7624160dc18a4df8e
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index 45c82b9fa..d6a9e1c6e 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -1016,7 +1016,7 @@ fn NewFlags(comptime debug_mode: bool) type {
is_transfer_encoding: bool = false,
/// Used to identify if request can be safely deinitialized
- is_waiting_body: bool = false,
+ is_waiting_for_request_body: bool = false,
/// Used in renderMissing in debug mode to show the user an HTML page
/// Used to avoid looking at the uws.Request struct after it's been freed
is_web_browser_navigation: if (debug_mode) bool else void = if (debug_mode) false else {},
@@ -1080,9 +1080,21 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
/// When the response body is a temporary value
response_buf_owned: std.ArrayListUnmanaged(u8) = .{},
+ /// Defer finalization until after the request handler task is completed?
+ defer_deinit_until_callback_completes: ?*bool = null,
+
// TODO: support builtin compression
const can_sendfile = !ssl_enabled;
+ pub inline fn isAsync(this: *const RequestContext) bool {
+ return this.defer_deinit_until_callback_completes == null;
+ }
+
+ fn drainMicrotasks(this: *const RequestContext) void {
+ if (this.isAsync()) return;
+ this.server.vm.drainMicrotasks();
+ }
+
pub fn setAbortHandler(this: *RequestContext) void {
if (this.flags.has_abort_handler) return;
if (this.resp) |resp| {
@@ -1320,8 +1332,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn end(this: *RequestContext, data: []const u8, closeConnection: bool) void {
if (this.resp) |resp| {
- if (this.flags.is_waiting_body) {
- this.flags.is_waiting_body = false;
+ if (this.flags.is_waiting_for_request_body) {
+ this.flags.is_waiting_for_request_body = false;
resp.clearOnData();
}
resp.end(data, closeConnection);
@@ -1331,8 +1343,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn endStream(this: *RequestContext, closeConnection: bool) void {
if (this.resp) |resp| {
- if (this.flags.is_waiting_body) {
- this.flags.is_waiting_body = false;
+ if (this.flags.is_waiting_for_request_body) {
+ this.flags.is_waiting_for_request_body = false;
resp.clearOnData();
}
resp.endStream(closeConnection);
@@ -1342,8 +1354,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn endWithoutBody(this: *RequestContext, closeConnection: bool) void {
if (this.resp) |resp| {
- if (this.flags.is_waiting_body) {
- this.flags.is_waiting_body = false;
+ if (this.flags.is_waiting_for_request_body) {
+ this.flags.is_waiting_for_request_body = false;
resp.clearOnData();
}
resp.endWithoutBody(closeConnection);
@@ -1562,9 +1574,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// if we are waiting for the body yet and the request was not aborted we can safely clear the onData callback
if (this.resp) |resp| {
- if (this.flags.is_waiting_body and this.flags.aborted == false) {
+ if (this.flags.is_waiting_for_request_body and this.flags.aborted == false) {
resp.clearOnData();
- this.flags.is_waiting_body = false;
+ this.flags.is_waiting_for_request_body = false;
}
}
}
@@ -1576,6 +1588,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
pub fn deinit(this: *RequestContext) void {
+ if (this.defer_deinit_until_callback_completes) |defer_deinit| {
+ defer_deinit.* = true;
+ ctxLog("deferred deinit <d> ({*})<r>", .{this});
+ return;
+ }
+
ctxLog("deinit<d> ({*})<r>", .{this});
if (comptime Environment.allow_assert)
std.debug.assert(this.flags.finalized);
@@ -1953,6 +1971,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
const StreamPair = struct { this: *RequestContext, stream: JSC.WebCore.ReadableStream };
+ fn handleFirstStreamWrite(this: *@This()) void {
+ if (!this.flags.has_written_status) {
+ this.renderMetadata();
+ }
+ }
+
fn doRenderStream(pair: *StreamPair) void {
var this = pair.this;
var stream = pair.stream;
@@ -1963,20 +1987,18 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
const resp = this.resp.?;
- // uWS automatically adds the status line if needed
- // we want to batch network calls as much as possible
- if (!(this.response_ptr.?.statusCode() == 200 and this.response_ptr.?.body.init.headers == null)) {
- this.renderMetadata();
- }
-
stream.value.ensureStillAlive();
var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable;
+ var globalThis = this.server.globalThis;
response_stream.* = ResponseStream.JSSink{
.sink = .{
.res = resp,
.allocator = this.allocator,
.buffer = bun.ByteList{},
+ .onFirstWrite = @ptrCast(&handleFirstStreamWrite),
+ .ctx = this,
+ .globalThis = globalThis,
},
};
var signal = &response_stream.sink.signal;
@@ -1991,13 +2013,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// We are already corked!
const assignment_result: JSValue = ResponseStream.JSSink.assignToStream(
- this.server.globalThis,
+ globalThis,
stream.value,
response_stream,
@as(**anyopaque, @ptrCast(&signal.ptr)),
);
assignment_result.ensureStillAlive();
+
// assert that it was updated
std.debug.assert(!signal.isDead());
@@ -2015,32 +2038,18 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
response_stream.detach();
this.sink = null;
response_stream.sink.destroy();
- stream.value.unprotect();
return this.handleReject(err_value);
}
- if (response_stream.sink.done or
- // TODO: is there a condition where resp could be freed before done?
- resp.hasResponded())
- {
+ if (resp.hasResponded()) {
if (!this.flags.aborted) resp.clearAborted();
- const wrote_anything = response_stream.sink.wrote > 0;
- streamLog("is done", .{});
- const responded = resp.hasResponded();
-
+ streamLog("done", .{});
response_stream.detach();
this.sink = null;
response_stream.sink.destroy();
- if (!responded and !wrote_anything and !this.flags.aborted) {
- this.renderMissing();
- return;
- } else if (wrote_anything and !responded and !this.flags.aborted) {
- this.endStream(this.shouldCloseConnection());
- }
-
+ this.endStream(this.shouldCloseConnection());
this.finalize();
stream.value.unprotect();
-
return;
}
@@ -2049,19 +2058,28 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// it returns a Promise when it goes through ReadableStreamDefaultReader
if (assignment_result.asAnyPromise()) |promise| {
streamLog("returned a promise", .{});
- switch (promise.status(this.server.globalThis.vm())) {
+ this.drainMicrotasks();
+
+ switch (promise.status(globalThis.vm())) {
.Pending => {
streamLog("promise still Pending", .{});
+ if (!this.flags.has_written_status) {
+ response_stream.sink.onFirstWrite = null;
+ response_stream.sink.ctx = null;
+ this.renderMetadata();
+ }
+
// TODO: should this timeout?
this.setAbortHandler();
+ this.pending_promises_for_abort += 1;
this.response_ptr.?.body.value = .{
.Locked = .{
.readable = stream,
- .global = this.server.globalThis,
+ .global = globalThis,
},
};
assignment_result.then(
- this.server.globalThis,
+ globalThis,
this,
onResolveStream,
onRejectStream,
@@ -2071,11 +2089,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
},
.Fulfilled => {
streamLog("promise Fulfilled", .{});
+ defer stream.value.unprotect();
+
this.handleResolveStream();
},
.Rejected => {
streamLog("promise Rejected", .{});
- this.handleRejectStream(this.server.globalThis, promise.result(this.server.globalThis.vm()));
+ defer stream.value.unprotect();
+
+ this.handleRejectStream(globalThis, promise.result(globalThis.vm()));
},
}
return;
@@ -2084,22 +2106,23 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (this.flags.aborted) {
response_stream.detach();
- stream.cancel(this.server.globalThis);
- response_stream.sink.done = true;
+ stream.cancel(globalThis);
+ defer stream.value.unprotect();
+ response_stream.sink.markDone();
this.finalizeForAbort();
response_stream.sink.finalize();
- stream.value.unprotect();
return;
}
stream.value.ensureStillAlive();
+ defer stream.value.unprotect();
const is_in_progress = response_stream.sink.has_backpressure or !(response_stream.sink.wrote == 0 and
response_stream.sink.buffer.len == 0);
- if (!stream.isLocked(this.server.globalThis) and !is_in_progress) {
- if (JSC.WebCore.ReadableStream.fromJS(stream.value, this.server.globalThis)) |comparator| {
+ if (!stream.isLocked(globalThis) and !is_in_progress) {
+ if (JSC.WebCore.ReadableStream.fromJS(stream.value, globalThis)) |comparator| {
if (std.meta.activeTag(comparator.ptr) == std.meta.activeTag(stream.ptr)) {
streamLog("is not locked", .{});
this.renderMissing();
@@ -2111,7 +2134,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.setAbortHandler();
streamLog("is in progress, but did not return a Promise. Finalizing request context", .{});
this.finalize();
- stream.value.unprotect();
}
const streamLog = Output.scoped(.ReadableStream, false);
@@ -2120,6 +2142,46 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
return @intFromPtr(this.upgrade_context) == std.math.maxInt(usize);
}
+ fn toAsyncWithoutAbortHandler(ctx: *RequestContext, req: *uws.Request, request_object: *Request) void {
+ request_object.uws_request = req;
+
+ request_object.ensureURL() catch {
+ request_object.url = bun.String.empty;
+ };
+
+ // we have to clone the request headers here since they will soon belong to a different request
+ if (request_object.headers == null) {
+ request_object.headers = JSC.FetchHeaders.createFromUWS(ctx.server.globalThis, req);
+ }
+
+ // This object dies after the stack frame is popped
+ // so we have to clear it in here too
+ request_object.uws_request = null;
+ }
+
+ fn toAsync(
+ ctx: *RequestContext,
+ req: *uws.Request,
+ request_object: *Request,
+ ) void {
+ ctxLog("toAsync", .{});
+ ctx.toAsyncWithoutAbortHandler(req, request_object);
+ if (comptime debug_mode) {
+ ctx.pathname = request_object.url.clone();
+ }
+ ctx.setAbortHandler();
+ }
+
+ // Each HTTP request or TCP socket connection is effectively a "task".
+ //
+ // However, unlike the regular task queue, we don't drain the microtask
+ // queue at the end.
+ //
+ // Instead, we drain it multiple times, at the points that would
+ // otherwise "halt" the Response from being rendered.
+ //
+ // - If you return a Promise, we drain the microtask queue once
+ // - If you return a streaming Response, we drain the microtask queue (possibly the 2nd time this task!)
pub fn onResponse(
ctx: *RequestContext,
this: *ThisServer,
@@ -2128,8 +2190,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
request_value: JSValue,
response_value: JSValue,
) void {
+ _ = request_object;
+ _ = req;
request_value.ensureStillAlive();
response_value.ensureStillAlive();
+ ctx.drainMicrotasks();
if (ctx.flags.aborted) {
ctx.finalizeForAbort();
@@ -2159,6 +2224,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
ctx.response_jsvalue = response_value;
ctx.response_jsvalue.ensureStillAlive();
ctx.flags.response_protected = false;
+
response.body.value.toBlobIfPossible();
switch (response.body.value) {
@@ -2210,6 +2276,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
ctx.response_jsvalue.ensureStillAlive();
ctx.flags.response_protected = false;
ctx.response_ptr = response;
+
response.body.value.toBlobIfPossible();
switch (response.body.value) {
.Blob => |*blob| {
@@ -2236,35 +2303,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
if (wait_for_promise) {
- request_object.uws_request = req;
-
- request_object.ensureURL() catch {
- request_object.url = bun.String.empty;
- };
-
- // we have to clone the request headers here since they will soon belong to a different request
- if (request_object.headers == null) {
- request_object.headers = JSC.FetchHeaders.createFromUWS(this.globalThis, req);
- }
-
- if (comptime debug_mode) {
- ctx.pathname = request_object.url.clone();
- }
-
- // This object dies after the stack frame is popped
- // so we have to clear it in here too
- request_object.uws_request = null;
-
- ctx.setAbortHandler();
ctx.pending_promises_for_abort += 1;
-
response_value.then(this.globalThis, ctx, RequestContext.onResolve, RequestContext.onReject);
return;
}
- if (ctx.resp) |resp| {
- // The user returned something that wasn't a promise or a promise with a response
- if (!resp.hasResponded() and !ctx.flags.has_marked_pending) ctx.renderMissing();
- }
}
pub fn handleResolveStream(req: *RequestContext) void {
@@ -2276,6 +2318,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
wrapper.sink.done = true;
req.flags.aborted = req.flags.aborted or wrapper.sink.aborted;
wrote_anything = wrapper.sink.wrote > 0;
+
wrapper.sink.finalize();
wrapper.detach();
req.sink = null;
@@ -2300,12 +2343,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
const responded = resp.hasResponded();
- if (!responded and !wrote_anything) {
- resp.clearAborted();
- req.renderMissing();
- return;
- } else if (!responded and wrote_anything) {
+ if (!responded) {
resp.clearAborted();
+ if (!req.flags.has_written_status) {
+ req.renderMetadata();
+ }
req.endStream(req.shouldCloseConnection());
}
@@ -2316,6 +2358,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
streamLog("onResolveStream", .{});
var args = callframe.arguments(2);
var req: *@This() = args.ptr[args.len - 1].asPromisePtr(@This());
+ req.pending_promises_for_abort -|= 1;
req.handleResolveStream();
return JSValue.jsUndefined();
}
@@ -2323,19 +2366,19 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
streamLog("onRejectStream", .{});
const args = callframe.arguments(2);
var req = args.ptr[args.len - 1].asPromisePtr(@This());
+ req.pending_promises_for_abort -|= 1;
var err = args.ptr[0];
req.handleRejectStream(globalThis, err);
return JSValue.jsUndefined();
}
pub fn handleRejectStream(req: *@This(), globalThis: *JSC.JSGlobalObject, err: JSValue) void {
+ _ = globalThis;
streamLog("handleRejectStream", .{});
- var wrote_anything = req.flags.has_written_status;
if (req.sink) |wrapper| {
wrapper.sink.pending_flush = null;
wrapper.sink.done = true;
- wrote_anything = wrote_anything or wrapper.sink.wrote > 0;
req.flags.aborted = req.flags.aborted or wrapper.sink.aborted;
wrapper.sink.finalize();
wrapper.detach();
@@ -2350,40 +2393,32 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
- streamLog("onReject({any})", .{wrote_anything});
-
- //aborted so call finalizeForAbort
+ // aborted so call finalizeForAbort
if (req.flags.aborted) {
req.finalizeForAbort();
return;
}
- if (!err.isEmptyOrUndefinedOrNull() and !wrote_anything) {
- req.response_jsvalue.unprotect();
- req.response_jsvalue = JSValue.zero;
- req.handleReject(err);
- return;
- } else if (wrote_anything) {
- req.endStream(true);
- if (comptime debug_mode) {
- if (!err.isEmptyOrUndefinedOrNull()) {
- var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator);
- defer exception_list.deinit();
- req.server.vm.runErrorHandler(err, &exception_list);
- }
- }
- req.finalize();
- return;
+ streamLog("onReject()", .{});
+
+ if (!req.flags.has_written_status) {
+ req.renderMetadata();
}
- const fallback = JSC.SystemError{
- .code = bun.String.static(@as(string, @tagName(JSC.Node.ErrorCode.ERR_UNHANDLED_ERROR))),
- .message = bun.String.static("Unhandled error in ReadableStream"),
- };
- req.handleReject(fallback.toErrorInstance(globalThis));
+ req.endStream(true);
+ if (comptime debug_mode) {
+ if (!err.isEmptyOrUndefinedOrNull()) {
+ var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator);
+ defer exception_list.deinit();
+ req.server.vm.runErrorHandler(err, &exception_list);
+ }
+ }
+ req.finalize();
}
pub fn doRenderWithBody(this: *RequestContext, value: *JSC.WebCore.Body.Value) void {
+ this.drainMicrotasks();
+
// If a ReadableStream can trivially be converted to a Blob, do so.
// If it's a WTFStringImpl and it cannot be used as a UTF-8 string, convert it to a Blob.
value.toBlobIfPossible();
@@ -2833,8 +2868,13 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
std.debug.assert(this.resp == resp);
- this.flags.is_waiting_body = last == false;
+ this.flags.is_waiting_for_request_body = last == false;
if (this.flags.aborted or this.flags.has_marked_complete) return;
+ if (!last and chunk.len == 0) {
+ // Sometimes, we get back an empty chunk
+ // We have to ignore those chunks unless it's the last one
+ return;
+ }
if (this.request_body != null) {
var body = this.request_body.?;
@@ -2900,6 +2940,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
if (old == .Locked) {
+ defer this.drainMicrotasks();
+
old.resolve(&body.value, this.server.globalThis);
}
return;
@@ -3254,7 +3296,7 @@ pub const WebSocketServer = struct {
globalObject.throwInvalidArguments("websocket expects maxPayloadLength to be an integer", .{});
return null;
}
- server.maxPayloadLength = @as(u32, @intCast(@max(value.toInt64(), 0)));
+ server.maxPayloadLength = @truncate(@max(value.toInt64(), 0));
}
}
@@ -3265,7 +3307,7 @@ pub const WebSocketServer = struct {
return null;
}
- var idleTimeout = @as(u16, @intCast(@as(u32, @truncate(@max(value.toInt64(), 0)))));
+ var idleTimeout: u16 = @truncate(@max(value.toInt64(), 0));
if (idleTimeout > 960) {
globalObject.throwInvalidArguments("websocket expects idleTimeout to be 960 or less", .{});
return null;
@@ -3285,7 +3327,7 @@ pub const WebSocketServer = struct {
return null;
}
- server.backpressureLimit = @as(u32, @intCast(@max(value.toInt64(), 0)));
+ server.backpressureLimit = @truncate(@max(value.toInt64(), 0));
}
}
@@ -5271,6 +5313,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
) void {
JSC.markBinding(@src());
this.pending_requests += 1;
+
req.setYield(false);
var ctx = this.request_pool_allocator.tryGet() catch @panic("ran out of memory");
ctx.create(this, req, resp);
@@ -5337,7 +5380,8 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
.onStartStreaming = RequestContext.onStartStreamingRequestBodyCallback,
},
};
- ctx.flags.is_waiting_body = true;
+ ctx.flags.is_waiting_for_request_body = true;
+
resp.onData(*RequestContext, RequestContext.onBufferedBodyChunk, ctx);
}
}
@@ -5352,7 +5396,13 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
request_value.ensureStillAlive();
const response_value = this.config.onRequest.callWithThis(this.globalThis, this.thisObject, &args);
+ defer {
+ // uWS request will not live longer than this function
+ request_object.uws_request = null;
+ }
+ var should_deinit_context = false;
+ ctx.defer_deinit_until_callback_completes = &should_deinit_context;
ctx.onResponse(
this,
req,
@@ -5360,8 +5410,20 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
request_value,
response_value,
);
- // uWS request will not live longer than this function
- request_object.uws_request = null;
+ ctx.defer_deinit_until_callback_completes = null;
+
+ if (should_deinit_context) {
+ request_object.uws_request = null;
+ ctx.deinit();
+ return;
+ }
+
+ if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body) {
+ ctx.renderMissing();
+ return;
+ }
+
+ ctx.toAsync(req, request_object);
}
pub fn onWebSocketUpgrade(
@@ -5404,7 +5466,13 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
const request_value = args[0];
request_value.ensureStillAlive();
const response_value = this.config.onRequest.callWithThis(this.globalThis, this.thisObject, &args);
+ defer {
+ // uWS request will not live longer than this function
+ request_object.uws_request = null;
+ }
+ var should_deinit_context = false;
+ ctx.defer_deinit_until_callback_completes = &should_deinit_context;
ctx.onResponse(
this,
req,
@@ -5412,9 +5480,20 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
request_value,
response_value,
);
+ ctx.defer_deinit_until_callback_completes = null;
- // uWS request will not live longer than this function
- request_object.uws_request = null;
+ if (should_deinit_context) {
+ request_object.uws_request = null;
+ ctx.deinit();
+ return;
+ }
+
+ if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body) {
+ ctx.renderMissing();
+ return;
+ }
+
+ ctx.toAsync(req, request_object);
}
pub fn listen(this: *ThisServer) void {
diff --git a/src/bun.js/bindings/AsyncContextFrame.cpp b/src/bun.js/bindings/AsyncContextFrame.cpp
index 2a103a8d1..1c541b2a8 100644
--- a/src/bun.js/bindings/AsyncContextFrame.cpp
+++ b/src/bun.js/bindings/AsyncContextFrame.cpp
@@ -97,10 +97,18 @@ extern "C" EncodedJSValue AsyncContextFrame__withAsyncContextIfNeeded(JSGlobalOb
// }
JSValue AsyncContextFrame::call(JSGlobalObject* global, JSValue functionObject, JSValue thisValue, const ArgList& args)
{
+ if (LIKELY(!global->isAsyncContextTrackingEnabled())) {
+ return JSC::profiledCall(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args);
+ }
+
ASYNCCONTEXTFRAME_CALL_IMPL(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args);
}
JSValue AsyncContextFrame::call(JSGlobalObject* global, JSValue functionObject, JSValue thisValue, const ArgList& args, NakedPtr<Exception>& returnedException)
{
+ if (LIKELY(!global->isAsyncContextTrackingEnabled())) {
+ return JSC::profiledCall(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args, returnedException);
+ }
+
ASYNCCONTEXTFRAME_CALL_IMPL(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args, returnedException);
}
diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp
index 0ecafeae4..d3bd623dd 100644
--- a/src/bun.js/bindings/ZigGlobalObject.cpp
+++ b/src/bun.js/bindings/ZigGlobalObject.cpp
@@ -1411,6 +1411,16 @@ JSC_DEFINE_HOST_FUNCTION(asyncHooksCleanupLater, (JSC::JSGlobalObject * globalOb
return JSC::JSValue::encode(JSC::jsUndefined());
}
+JSC_DEFINE_HOST_FUNCTION(asyncHooksSetEnabled, (JSC::JSGlobalObject * globalObject, JSC::CallFrame* callFrame))
+{
+ // assumptions and notes:
+ // - nobody else uses setOnEachMicrotaskTick
+ // - this is called by js if we set async context in a way we may not clear it
+ // - AsyncLocalStorage.prototype.run cleans up after itself and does not call this cb
+ globalObject->setAsyncContextTrackingEnabled(callFrame->argument(0).toBoolean(globalObject));
+ return JSC::JSValue::encode(JSC::jsUndefined());
+}
+
extern "C" int Bun__ttySetMode(int fd, int mode);
JSC_DEFINE_HOST_FUNCTION(jsTTYSetMode, (JSC::JSGlobalObject * globalObject, CallFrame* callFrame))
@@ -1689,6 +1699,10 @@ static JSC_DEFINE_HOST_FUNCTION(functionLazyLoad,
if (string == "async_hooks"_s) {
auto* obj = constructEmptyObject(globalObject);
obj->putDirect(
+ vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "setAsyncHooksEnabled"_s)),
+ JSC::JSFunction::create(vm, globalObject, 0, "setAsyncHooksEnabled"_s, asyncHooksSetEnabled, ImplementationVisibility::Public), 0);
+
+ obj->putDirect(
vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "cleanupLater"_s)),
JSC::JSFunction::create(vm, globalObject, 0, "cleanupLater"_s, asyncHooksCleanupLater, ImplementationVisibility::Public), 0);
return JSValue::encode(obj);
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 92874b6a4..896297060 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -509,6 +509,7 @@ comptime {
}
}
+pub const DeferredRepeatingTask = *const (fn (*anyopaque) bool);
pub const EventLoop = struct {
tasks: Queue = undefined,
concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{},
@@ -518,6 +519,7 @@ pub const EventLoop = struct {
start_server_on_next_tick: bool = false,
defer_count: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
forever_timer: ?*uws.Timer = null,
+ deferred_microtask_map: std.AutoArrayHashMapUnmanaged(?*anyopaque, DeferredRepeatingTask) = .{},
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
const log = bun.Output.scoped(.EventLoop, false);
@@ -528,6 +530,49 @@ pub const EventLoop = struct {
}
}
+ pub fn drainMicrotasksWithVM(this: *EventLoop, vm: *JSC.VM) void {
+ vm.drainMicrotasks();
+ this.drainDeferredTasks();
+ }
+
+ pub fn drainMicrotasks(this: *EventLoop) void {
+ this.drainMicrotasksWithVM(this.global.vm());
+ }
+
+ pub fn ensureAliveForOneTick(this: *EventLoop) void {
+ if (this.noop_task.scheduled) return;
+ this.enqueueTask(Task.init(&this.noop_task));
+ this.noop_task.scheduled = true;
+ }
+
+ pub fn registerDeferredTask(this: *EventLoop, ctx: ?*anyopaque, task: DeferredRepeatingTask) bool {
+ const existing = this.deferred_microtask_map.getOrPutValue(this.virtual_machine.allocator, ctx, task) catch unreachable;
+ return existing.found_existing;
+ }
+
+ pub fn unregisterDeferredTask(this: *EventLoop, ctx: ?*anyopaque) bool {
+ return this.deferred_microtask_map.swapRemove(ctx);
+ }
+
+ fn drainDeferredTasks(this: *EventLoop) void {
+ var i: usize = 0;
+ var last = this.deferred_microtask_map.count();
+ while (i < last) {
+ var key = this.deferred_microtask_map.keys()[i] orelse {
+ this.deferred_microtask_map.swapRemoveAt(i);
+ last = this.deferred_microtask_map.count();
+ continue;
+ };
+
+ if (!this.deferred_microtask_map.values()[i](key)) {
+ this.deferred_microtask_map.swapRemoveAt(i);
+ last = this.deferred_microtask_map.count();
+ } else {
+ i += 1;
+ }
+ }
+ }
+
pub fn tickWithCount(this: *EventLoop) u32 {
var global = this.global;
var global_vm = global.vm();
@@ -621,7 +666,7 @@ pub const EventLoop = struct {
}
global_vm.releaseWeakRefs();
- global_vm.drainMicrotasks();
+ this.drainMicrotasksWithVM(global_vm);
}
this.tasks.head = if (this.tasks.count == 0) 0 else this.tasks.head;
@@ -758,7 +803,7 @@ pub const EventLoop = struct {
this.tickConcurrent();
} else {
global_vm.releaseWeakRefs();
- global_vm.drainMicrotasks();
+ this.drainMicrotasksWithVM(global_vm);
this.tickConcurrent();
if (this.tasks.count > 0) continue;
}
diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig
index a016129e2..492b9fbee 100644
--- a/src/bun.js/javascript.zig
+++ b/src/bun.js/javascript.zig
@@ -1742,6 +1742,10 @@ pub const VirtualMachine = struct {
ret.success = true;
}
+ pub fn drainMicrotasks(this: *VirtualMachine) void {
+ this.eventLoop().drainMicrotasks();
+ }
+
pub fn processFetchLog(globalThis: *JSGlobalObject, specifier: bun.String, referrer: bun.String, log: *logger.Log, ret: *ErrorableResolvedSource, err: anyerror) void {
switch (log.msgs.items.len) {
0 => {
diff --git a/src/bun.js/webcore/body.zig b/src/bun.js/webcore/body.zig
index fa0ec9b24..86462dd04 100644
--- a/src/bun.js/webcore/body.zig
+++ b/src/bun.js/webcore/body.zig
@@ -249,10 +249,7 @@ pub const Body = struct {
pub fn setPromise(value: *PendingValue, globalThis: *JSC.JSGlobalObject, action: Action) JSValue {
value.action = action;
- if (value.readable) |readable| {
- // switch (readable.ptr) {
- // .JavaScript
- // }
+ if (value.readable) |readable| handle_stream: {
switch (action) {
.getFormData, .getText, .getJSON, .getBlob, .getArrayBuffer => {
value.promise = switch (action) {
@@ -261,6 +258,20 @@ pub const Body = struct {
.getText => globalThis.readableStreamToText(readable.value),
.getBlob => globalThis.readableStreamToBlob(readable.value),
.getFormData => |form_data| brk: {
+ if (value.onStartBuffering != null) {
+ if (readable.isDisturbed(globalThis)) {
+ form_data.?.deinit();
+ readable.value.unprotect();
+ value.readable = null;
+ value.action = .{ .none = {} };
+ return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.createErrorInstance("ReadableStream is already used", .{}));
+ } else {
+ readable.detach(globalThis);
+ value.readable = null;
+ }
+
+ break :handle_stream;
+ }
defer {
form_data.?.deinit();
value.action.getFormData = null;
diff --git a/src/bun.js/webcore/request.zig b/src/bun.js/webcore/request.zig
index c01e72d60..aaa3f6b79 100644
--- a/src/bun.js/webcore/request.zig
+++ b/src/bun.js/webcore/request.zig
@@ -485,7 +485,8 @@ pub const Request = struct {
_ = req.body.unref();
return null;
};
- req.url = str;
+ req.url = str.dupeRef();
+
if (!req.url.isEmpty())
fields.insert(.url);
} else if (!url_or_object_type.isObject()) {
@@ -554,7 +555,7 @@ pub const Request = struct {
if (!fields.contains(.url)) {
if (!response.url.isEmpty()) {
- req.url = response.url;
+ req.url = response.url.dupeRef();
fields.insert(.url);
}
}
@@ -586,7 +587,7 @@ pub const Request = struct {
if (!fields.contains(.url)) {
if (value.fastGet(globalThis, .url)) |url| {
- req.url = bun.String.fromJS(url, globalThis);
+ req.url = bun.String.fromJS(url, globalThis).dupeRef();
if (!req.url.isEmpty())
fields.insert(.url);
@@ -599,7 +600,7 @@ pub const Request = struct {
_ = req.body.unref();
return null;
};
- req.url = str;
+ req.url = str.dupeRef();
if (!req.url.isEmpty())
fields.insert(.url);
}
@@ -648,9 +649,10 @@ pub const Request = struct {
return null;
}
- // Note that the string is going to be ref'd here, so we don't need to ref it above.
const href = JSC.URL.hrefFromString(req.url);
if (href.isEmpty()) {
+ // globalThis.throw can cause GC, which could cause the above string to be freed.
+ // so we must increment the reference count before calling it.
globalThis.throw("Failed to construct 'Request': Invalid URL \"{}\"", .{
req.url,
});
@@ -658,6 +660,14 @@ pub const Request = struct {
_ = req.body.unref();
return null;
}
+
+ // hrefFromString increments the reference count if they end up being
+ // the same
+ //
+ // we increment the reference count on usage above, so we must
+ // decrement it to be perfectly balanced.
+ req.url.deref();
+
req.url = href;
if (req.body.value == .Blob and
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 01ecfad36..d947a7d4e 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -617,6 +617,7 @@ pub const Fetch = struct {
http: ?*HTTPClient.AsyncHTTP = null,
result: HTTPClient.HTTPClientResult = .{},
+ metadata: ?HTTPClient.HTTPClientResult.ResultMetadata = .{},
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
request_body: HTTPRequestBody = undefined,
@@ -641,7 +642,8 @@ pub const Fetch = struct {
url_proxy_buffer: []const u8 = "",
signal: ?*JSC.WebCore.AbortSignal = null,
- aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+ signals: HTTPClient.Signals = .{},
+ signal_store: HTTPClient.Signals.Store = .{},
has_schedule_callback: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
// must be stored because AbortSignal stores reason weakly
@@ -702,11 +704,19 @@ pub const Fetch = struct {
this.request_headers.entries.deinit(bun.default_allocator);
this.request_headers.buf.deinit(bun.default_allocator);
this.request_headers = Headers{ .allocator = undefined };
- this.http.?.clearData();
- this.result.deinitMetadata();
+ if (this.http != null) {
+ this.http.?.clearData();
+ }
+
+ if (this.metadata != null) {
+ this.metadata.?.deinit();
+ this.metadata = null;
+ }
+
this.response_buffer.deinit();
this.response.deinit();
+
this.scheduled_response_buffer.deinit();
this.request_body.detach();
@@ -725,9 +735,13 @@ pub const Fetch = struct {
}
pub fn onBodyReceived(this: *FetchTasklet) void {
+ this.mutex.lock();
const success = this.result.isSuccess();
const globalThis = this.global_this;
defer {
+ this.has_schedule_callback.store(false, .Monotonic);
+ this.mutex.unlock();
+
if (!success or !this.result.has_more) {
var vm = globalThis.bunVM();
this.poll_ref.unref(vm);
@@ -831,43 +845,42 @@ pub const Fetch = struct {
pub fn onProgressUpdate(this: *FetchTasklet) void {
JSC.markBinding(@src());
- this.mutex.lock();
- defer {
- this.has_schedule_callback.store(false, .Monotonic);
- this.mutex.unlock();
- }
-
if (this.is_waiting_body) {
return this.onBodyReceived();
}
+ this.mutex.lock();
const globalThis = this.global_this;
var ref = this.promise;
const promise_value = ref.value();
- defer ref.strong.deinit();
var poll_ref = this.poll_ref;
var vm = globalThis.bunVM();
if (promise_value.isEmptyOrUndefinedOrNull()) {
+ ref.strong.deinit();
+ this.has_schedule_callback.store(false, .Monotonic);
+ this.mutex.unlock();
poll_ref.unref(vm);
this.clearData();
this.deinit();
return;
}
+ const promise = promise_value.asAnyPromise().?;
+ const tracker = this.tracker;
+ tracker.willDispatch(globalThis);
defer {
+ tracker.didDispatch(globalThis);
+ ref.strong.deinit();
+ this.has_schedule_callback.store(false, .Monotonic);
+ this.mutex.unlock();
if (!this.is_waiting_body) {
poll_ref.unref(vm);
this.clearData();
this.deinit();
}
}
-
- const promise = promise_value.asAnyPromise().?;
- const tracker = this.tracker;
- tracker.willDispatch(globalThis);
- defer tracker.didDispatch(globalThis);
const success = this.result.isSuccess();
const result = switch (success) {
true => this.onResolve(),
@@ -907,6 +920,16 @@ pub const Fetch = struct {
return JSC.WebCore.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this);
}
+ var path: bun.String = undefined;
+
+ if (this.metadata) |metadata| {
+ path = bun.String.create(metadata.href);
+ } else if (this.http) |http| {
+ path = bun.String.create(http.url.href);
+ } else {
+ path = bun.String.empty;
+ }
+
const fetch_error = JSC.SystemError{
.code = bun.String.static(@errorName(this.result.fail)),
.message = switch (this.result.fail) {
@@ -916,7 +939,7 @@ pub const Fetch = struct {
error.ConnectionRefused => bun.String.static("Unable to connect. Is the computer able to access the url?"),
else => bun.String.static("fetch() failed. For more information, pass `verbose: true` in the second argument to fetch()"),
},
- .path = bun.String.create(this.http.?.url.href),
+ .path = path,
};
return fetch_error.toErrorInstance(this.global_this);
@@ -927,7 +950,7 @@ pub const Fetch = struct {
if (this.http) |http| {
http.enableBodyStreaming();
}
- if (this.aborted.load(.Acquire)) {
+ if (this.signal_store.aborted.load(.Monotonic)) {
return JSC.WebCore.DrainResult{
.aborted = {},
};
@@ -1000,21 +1023,27 @@ pub const Fetch = struct {
}
fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response {
- const http_response = this.result.response;
- this.is_waiting_body = this.result.has_more;
- return Response{
- .allocator = allocator,
- .url = bun.String.createAtomIfPossible(this.result.href),
- .status_text = bun.String.createAtomIfPossible(http_response.status),
- .redirected = this.result.redirected,
- .body = .{
- .init = .{
- .headers = FetchHeaders.createFromPicoHeaders(http_response.headers),
- .status_code = @as(u16, @truncate(http_response.status_code)),
+ // at this point we always should have metadata
+ std.debug.assert(this.metadata != null);
+ if (this.metadata) |metadata| {
+ const http_response = metadata.response;
+ this.is_waiting_body = this.result.has_more;
+ return Response{
+ .allocator = allocator,
+ .url = bun.String.createAtomIfPossible(metadata.href),
+ .status_text = bun.String.createAtomIfPossible(http_response.status),
+ .redirected = this.result.redirected,
+ .body = .{
+ .init = .{
+ .headers = FetchHeaders.createFromPicoHeaders(http_response.headers),
+ .status_code = @as(u16, @truncate(http_response.status_code)),
+ },
+ .value = this.toBodyValue(),
},
- .value = this.toBodyValue(),
- },
- };
+ };
+ }
+
+ @panic("fetch metadata should be provided");
}
pub fn onResolve(this: *FetchTasklet) JSValue {
@@ -1063,6 +1092,7 @@ pub const Fetch = struct {
.hostname = fetch_options.hostname,
.tracker = JSC.AsyncTaskTracker.init(jsc_vm),
};
+ fetch_tasklet.signals = fetch_tasklet.signal_store.to();
fetch_tasklet.tracker.didSchedule(globalThis);
@@ -1079,6 +1109,10 @@ pub const Fetch = struct {
proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url);
}
+ if (fetch_tasklet.signal == null) {
+ fetch_tasklet.signals.aborted = null;
+ }
+
fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(
allocator,
fetch_options.method,
@@ -1095,9 +1129,10 @@ pub const Fetch = struct {
fetch_tasklet,
),
proxy,
- if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null,
+
fetch_options.hostname,
fetch_options.redirect_type,
+ fetch_tasklet.signals,
);
if (fetch_options.redirect_type != FetchRedirect.follow) {
@@ -1108,7 +1143,7 @@ pub const Fetch = struct {
fetch_tasklet.http.?.client.verbose = fetch_options.verbose;
fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive;
// we wanna to return after headers are received
- fetch_tasklet.http.?.signalHeaderProgress();
+ fetch_tasklet.signal_store.header_progress.store(true, .Monotonic);
if (fetch_tasklet.request_body == .Sendfile) {
std.debug.assert(fetch_options.url.isHTTP());
@@ -1127,7 +1162,7 @@ pub const Fetch = struct {
reason.ensureStillAlive();
this.abort_reason = reason;
reason.protect();
- this.aborted.store(true, .Monotonic);
+ this.signal_store.aborted.store(true, .Monotonic);
this.tracker.didCancel(this.global_this);
if (this.http != null) {
@@ -1180,11 +1215,14 @@ pub const Fetch = struct {
task.mutex.lock();
defer task.mutex.unlock();
task.result = result;
+ // metadata should be provided only once so we preserve it until we consume it
+ if (result.metadata) |metadata| {
+ task.metadata = metadata;
+ }
task.body_size = result.body_size;
const success = result.isSuccess();
task.response_buffer = result.body.?.*;
-
if (success) {
_ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch @panic("OOM");
}
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 955d10ffb..771d34db0 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -1915,6 +1915,34 @@ pub const ArrayBufferSink = struct {
pub const JSSink = NewJSSink(@This(), "ArrayBufferSink");
};
+const AutoFlusher = struct {
+ registered: bool = false,
+
+ pub fn registerDeferredMicrotaskWithType(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void {
+ if (this.auto_flusher.registered) return;
+ this.auto_flusher.registered = true;
+ std.debug.assert(!vm.eventLoop().registerDeferredTask(this, @ptrCast(&Type.onAutoFlush)));
+ }
+
+ pub fn unregisterDeferredMicrotaskWithType(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void {
+ if (!this.auto_flusher.registered) return;
+ this.auto_flusher.registered = false;
+ std.debug.assert(vm.eventLoop().unregisterDeferredTask(this));
+ }
+
+ pub fn unregisterDeferredMicrotaskWithTypeUnchecked(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void {
+ std.debug.assert(this.auto_flusher.registered);
+ std.debug.assert(vm.eventLoop().unregisterDeferredTask(this));
+ this.auto_flusher.registered = false;
+ }
+
+ pub fn registerDeferredMicrotaskWithTypeUnchecked(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void {
+ std.debug.assert(!this.auto_flusher.registered);
+ this.auto_flusher.registered = true;
+ std.debug.assert(!vm.eventLoop().registerDeferredTask(this, @ptrCast(&Type.onAutoFlush)));
+ }
+};
+
pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
return struct {
sink: SinkType,
@@ -2357,6 +2385,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
end_len: usize = 0,
aborted: bool = false,
+ onFirstWrite: ?*const fn (?*anyopaque) void = null,
+ ctx: ?*anyopaque = null,
+
+ auto_flusher: AutoFlusher = AutoFlusher{},
+
const log = Output.scoped(.HTTPServerWritable, false);
pub fn connect(this: *@This(), signal: Signal) void {
@@ -2375,15 +2408,25 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
}
+ fn handleFirstWriteIfNecessary(this: *@This()) void {
+ if (this.onFirstWrite) |onFirstWrite| {
+ var ctx = this.ctx;
+ this.ctx = null;
+ this.onFirstWrite = null;
+ onFirstWrite(ctx);
+ }
+ }
+
fn hasBackpressure(this: *const @This()) bool {
return this.has_backpressure;
}
- fn send(this: *@This(), buf: []const u8) bool {
+ fn sendWithoutAutoFlusher(this: *@This(), buf: []const u8) bool {
std.debug.assert(!this.done);
defer log("send: {d} bytes (backpressure: {any})", .{ buf.len, this.has_backpressure });
if (this.requested_end and !this.res.state().isHttpWriteCalled()) {
+ this.handleFirstWriteIfNecessary();
const success = this.res.tryEnd(buf, this.end_len, false);
this.has_backpressure = !success;
return success;
@@ -2395,10 +2438,12 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
// so in this scenario, we just append to the buffer
// and report success
if (this.requested_end) {
+ this.handleFirstWriteIfNecessary();
this.res.end(buf, false);
this.has_backpressure = false;
return true;
} else {
+ this.handleFirstWriteIfNecessary();
this.has_backpressure = !this.res.write(buf);
if (this.has_backpressure) {
this.res.onWritable(*@This(), onWritable, this);
@@ -2409,6 +2454,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
unreachable;
}
+ fn send(this: *@This(), buf: []const u8) bool {
+ this.unregisterAutoFlusher();
+ return this.sendWithoutAutoFlusher(buf);
+ }
+
fn readableSlice(this: *@This()) []const u8 {
return this.buffer.ptr[this.offset..this.buffer.cap][0..this.buffer.len];
}
@@ -2464,7 +2514,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn start(this: *@This(), stream_start: StreamStart) JSC.Node.Maybe(void) {
if (this.aborted or this.res.hasResponded()) {
- this.done = true;
+ this.markDone();
this.signal.close(null);
return .{ .result = {} };
}
@@ -2529,6 +2579,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn flushFromJS(this: *@This(), globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) {
log("flushFromJS({any})", .{wait});
+ this.unregisterAutoFlusher();
+
if (!wait) {
return this.flushFromJSNoWait();
}
@@ -2563,12 +2615,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn flush(this: *@This()) JSC.Node.Maybe(void) {
log("flush()", .{});
+ this.unregisterAutoFlusher();
+
if (!this.hasBackpressure() or this.done) {
return .{ .result = {} };
}
if (this.res.hasResponded()) {
- this.done = true;
+ this.markDone();
this.signal.close(null);
}
@@ -2596,6 +2650,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
+ this.registerAutoFlusher();
} else if (this.buffer.len + len >= this.highWaterMark) {
// TODO: attempt to write both in a corked buffer?
_ = this.buffer.write(this.allocator, bytes) catch {
@@ -2613,9 +2668,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
+ this.registerAutoFlusher();
return .{ .owned = len };
}
+ this.registerAutoFlusher();
this.res.onWritable(*@This(), onWritable, this);
return .{ .owned = len };
@@ -2628,7 +2685,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.res.hasResponded()) {
this.signal.close(null);
- this.done = true;
+ this.markDone();
return .{ .done = {} };
}
@@ -2676,9 +2733,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
_ = this.buffer.writeLatin1(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
+ this.registerAutoFlusher();
return .{ .owned = len };
}
+ this.registerAutoFlusher();
this.res.onWritable(*@This(), onWritable, this);
return .{ .owned = len };
@@ -2690,7 +2749,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.res.hasResponded()) {
this.signal.close(null);
- this.done = true;
+ this.markDone();
return .{ .done = {} };
}
@@ -2715,9 +2774,15 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.res.onWritable(*@This(), onWritable, this);
}
+ this.registerAutoFlusher();
return .{ .owned = @as(Blob.SizeType, @intCast(written)) };
}
+ pub fn markDone(this: *@This()) void {
+ this.done = true;
+ this.unregisterAutoFlusher();
+ }
+
// In this case, it's always an error
pub fn end(this: *@This(), err: ?Syscall.Error) JSC.Node.Maybe(void) {
log("end({any})", .{err});
@@ -2728,7 +2793,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.done or this.res.hasResponded()) {
this.signal.close(err);
- this.done = true;
+ this.markDone();
this.finalize();
return .{ .result = {} };
}
@@ -2739,7 +2804,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (readable.len == 0) {
this.signal.close(err);
- this.done = true;
+ this.markDone();
// we do not close the stream here
// this.res.endStream(false);
this.finalize();
@@ -2759,7 +2824,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.done or this.res.hasResponded()) {
this.requested_end = true;
this.signal.close(null);
- this.done = true;
+ this.markDone();
this.finalize();
return .{ .result = JSC.JSValue.jsNumber(0) };
}
@@ -2780,10 +2845,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.res.end("", false);
}
- this.done = true;
+ this.markDone();
this.flushPromise();
this.signal.close(null);
- this.done = true;
this.finalize();
return .{ .result = JSC.JSValue.jsNumber(this.wrote) };
@@ -2796,12 +2860,50 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn abort(this: *@This()) void {
log("onAborted()", .{});
this.done = true;
+ this.unregisterAutoFlusher();
+
this.aborted = true;
this.signal.close(null);
+
this.flushPromise();
this.finalize();
}
+ fn unregisterAutoFlusher(this: *@This()) void {
+ if (this.auto_flusher.registered)
+ AutoFlusher.unregisterDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM());
+ }
+
+ fn registerAutoFlusher(this: *@This()) void {
+ if (!this.auto_flusher.registered)
+ AutoFlusher.registerDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM());
+ }
+
+ pub fn onAutoFlush(this: *@This()) bool {
+ log("onAutoFlush()", .{});
+ if (this.done) {
+ this.auto_flusher.registered = false;
+ return false;
+ }
+
+ const readable = this.readableSlice();
+
+ if (this.hasBackpressure() or readable.len == 0) {
+ this.auto_flusher.registered = false;
+ return false;
+ }
+
+ if (!this.sendWithoutAutoFlusher(readable)) {
+ this.auto_flusher.registered = true;
+ this.res.onWritable(*@This(), onWritable, this);
+ return true;
+ }
+
+ this.handleWrote(readable.len);
+ this.auto_flusher.registered = false;
+ return false;
+ }
+
pub fn destroy(this: *@This()) void {
log("destroy()", .{});
var bytes = this.buffer.listManaged(this.allocator);
@@ -2810,6 +2912,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
bytes.deinit();
}
+ this.unregisterAutoFlusher();
+
this.allocator.destroy(this);
}
@@ -2820,6 +2924,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (!this.done) {
this.done = true;
+ this.unregisterAutoFlusher();
this.res.endStream(false);
}