aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-09-16 00:53:03 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-09-16 00:53:03 -0700
commit0ce709d96abb48c747f5c93033c9a80fe79ee3bc (patch)
treef535a53c23fd95154b36ceab7c38c8e3a0275c89 /src
parentfd808dec524c60ba18c620e27b205828760a6e41 (diff)
downloadbun-0ce709d96abb48c747f5c93033c9a80fe79ee3bc.tar.gz
bun-0ce709d96abb48c747f5c93033c9a80fe79ee3bc.tar.zst
bun-0ce709d96abb48c747f5c93033c9a80fe79ee3bc.zip
Make new HTTP client more stable
Diffstat (limited to 'src')
-rw-r--r--src/analytics/analytics_thread.zig2
-rw-r--r--src/bun.js/api/bun.zig34
-rw-r--r--src/bun.js/api/server.zig50
-rw-r--r--src/bun.js/base.zig41
-rw-r--r--src/bun.js/bindings/JSBuffer.cpp7
-rw-r--r--src/bun.js/event_loop.zig254
-rw-r--r--src/bun.js/javascript.zig19
-rw-r--r--src/bun.js/unbounded_queue.zig149
-rw-r--r--src/bun.js/webcore/response.zig280
-rw-r--r--src/bun.js/webcore/streams.zig3
-rw-r--r--src/bun_js.zig15
-rw-r--r--src/cli/create_command.zig30
-rw-r--r--src/cli/test_command.zig6
-rw-r--r--src/cli/upgrade_command.zig6
-rw-r--r--src/deps/uws.zig89
-rw-r--r--src/feature_flags.zig1
-rw-r--r--src/global.zig2
-rw-r--r--src/http/websocket_http_client.zig10
-rw-r--r--src/http_client_async.zig464
-rw-r--r--src/install/install.zig6
-rw-r--r--src/mimalloc_arena.zig5
-rw-r--r--src/napi/napi.zig52
-rw-r--r--src/pool.zig8
-rw-r--r--src/thread_pool.zig2
24 files changed, 915 insertions, 620 deletions
diff --git a/src/analytics/analytics_thread.zig b/src/analytics/analytics_thread.zig
index 82f435025..75cd3165b 100644
--- a/src/analytics/analytics_thread.zig
+++ b/src/analytics/analytics_thread.zig
@@ -404,7 +404,7 @@ fn readloop() anyerror!void {
headers_entries,
headers_buf,
&out_buffer,
- &event_list.in_buffer,
+ "",
std.time.ns_per_ms * 10000,
) catch return;
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig
index a37d5d62c..3fafdc177 100644
--- a/src/bun.js/api/bun.zig
+++ b/src/bun.js/api/bun.zig
@@ -1101,6 +1101,9 @@ pub const Class = NewClass(
.nanoseconds = .{
.rfn = nanoseconds,
},
+ .DO_NOT_USE_OR_YOU_WILL_BE_FIRED_mimalloc_dump = .{
+ .rfn = dump_mimalloc,
+ },
.gzipSync = .{
.rfn = JSC.wrapWithHasContainer(JSZlib, "gzipSync", false, false, true),
},
@@ -1191,6 +1194,18 @@ pub const Class = NewClass(
},
);
+fn dump_mimalloc(
+ _: void,
+ globalThis: JSC.C.JSContextRef,
+ _: JSC.C.JSObjectRef,
+ _: JSC.C.JSObjectRef,
+ _: []const JSC.C.JSValueRef,
+ _: JSC.C.ExceptionRef,
+) JSC.C.JSValueRef {
+ globalThis.bunVM().arena.dumpThreadStats();
+ return JSC.JSValue.jsUndefined().asObjectRef();
+}
+
pub const Crypto = struct {
const Hashers = @import("../../sha.zig");
@@ -2101,6 +2116,8 @@ pub const Timer = struct {
return VirtualMachine.vm.timer.last_id;
}
+ const Pool = bun.ObjectPool(Timeout, null, true, 1000);
+
pub const Timeout = struct {
id: i32 = 0,
callback: JSValue,
@@ -2134,11 +2151,13 @@ pub const Timer = struct {
if (comptime JSC.is_bindgen)
unreachable;
+ var vm = global.bunVM();
+
if (!this.cancelled) {
if (this.repeat) {
this.io_task.?.deinit();
- var task = Timeout.TimeoutTask.createOnJSThread(VirtualMachine.vm.allocator, global, this) catch unreachable;
- VirtualMachine.vm.timer.timeouts.put(VirtualMachine.vm.allocator, this.id, this) catch unreachable;
+ var task = Timeout.TimeoutTask.createOnJSThread(vm.allocator, global, this) catch unreachable;
+ vm.timer.timeouts.put(vm.allocator, this.id, this) catch unreachable;
this.io_task = task;
task.schedule();
}
@@ -2148,12 +2167,12 @@ pub const Timer = struct {
if (this.repeat)
return;
- VirtualMachine.vm.timer.active -|= 1;
- VirtualMachine.vm.active_tasks -|= 1;
+ vm.timer.active -|= 1;
+ vm.active_tasks -|= 1;
} else {
// the active tasks count is already cleared for canceled timeout,
// add one here to neutralize the `-|= 1` in event loop.
- VirtualMachine.vm.active_tasks +|= 1;
+ vm.active_tasks +|= 1;
}
this.clear(global);
@@ -2168,8 +2187,9 @@ pub const Timer = struct {
_ = VirtualMachine.vm.timer.timeouts.swapRemove(this.id);
if (this.io_task) |task| {
task.deinit();
+ this.io_task = null;
}
- VirtualMachine.vm.allocator.destroy(this);
+ Pool.releaseValue(this);
}
};
@@ -2181,7 +2201,7 @@ pub const Timer = struct {
repeat: bool,
) !void {
if (comptime is_bindgen) unreachable;
- var timeout = try VirtualMachine.vm.allocator.create(Timeout);
+ var timeout = Pool.first(globalThis.bunVM().allocator);
js.JSValueProtect(globalThis.ref(), callback.asObjectRef());
timeout.* = Timeout{ .id = id, .callback = callback, .interval = countdown.toInt32(), .repeat = repeat };
var task = try Timeout.TimeoutTask.createOnJSThread(VirtualMachine.vm.allocator, globalThis, timeout);
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index b79e6c7ab..55b829caa 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -1850,6 +1850,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
has_js_deinited: bool = false,
listen_callback: JSC.AnyTask = undefined,
allocator: std.mem.Allocator,
+ keeping_js_alive: bool = false,
pub const Class = JSC.NewClass(
ThisServer,
@@ -1917,15 +1918,17 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
}
pub fn deinitIfWeCan(this: *ThisServer) void {
- if (this.pending_requests == 0 and this.listener == null and this.has_js_deinited)
+ if (this.pending_requests == 0 and this.listener == null and this.has_js_deinited) {
+ this.deref();
this.deinit();
+ }
}
pub fn stop(this: *ThisServer) void {
if (this.listener) |listener| {
- listener.close();
this.listener = null;
- this.vm.disable_run_us_loop = false;
+ this.deref();
+ listener.close();
}
this.deinitIfWeCan();
@@ -2038,22 +2041,26 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
this.listener = socket;
const needs_post_handler = this.vm.uws_event_loop == null;
this.vm.uws_event_loop = uws.Loop.get();
- this.listen_callback = JSC.AnyTask.New(ThisServer, run).init(this);
- this.vm.eventLoop().enqueueTask(JSC.Task.init(&this.listen_callback));
+ this.ref();
+
if (needs_post_handler) {
_ = this.vm.uws_event_loop.?.addPostHandler(*JSC.EventLoop, this.vm.eventLoop(), JSC.EventLoop.tick);
+ _ = this.vm.uws_event_loop.?.addPreHandler(*JSC.EventLoop, this.vm.eventLoop(), JSC.EventLoop.tick);
}
}
- pub fn run(this: *ThisServer) void {
- // this.app.addServerName(hostname_pattern: [*:0]const u8)
+ pub fn ref(this: *ThisServer) void {
+ if (this.keeping_js_alive) return;
+
+ this.vm.us_loop_reference_count +|= 1;
+ this.keeping_js_alive = true;
+ }
- // we do not increment the reference count here
- // uWS manages running the loop, so it is unnecessary
- // this.vm.us_loop_reference_count +|= 1;
- this.vm.disable_run_us_loop = true;
+ pub fn deref(this: *ThisServer) void {
+ if (!this.keeping_js_alive) return;
- this.app.run();
+ this.vm.us_loop_reference_count -|= 1;
+ this.keeping_js_alive = false;
}
pub fn onBunInfoRequest(this: *ThisServer, req: *uws.Request, resp: *App.Response) void {
@@ -2286,11 +2293,20 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
this.app.get("/src:/*", *ThisServer, this, onSrcRequest);
}
- this.app.listenWithConfig(*ThisServer, this, onListen, .{
- .port = this.config.port,
- .host = this.config.hostname,
- .options = 0,
- });
+ const hostname = bun.span(this.config.hostname);
+
+ if (!(hostname.len == 0 or strings.eqlComptime(hostname, "0.0.0.0"))) {
+ this.app.listenWithConfig(*ThisServer, this, onListen, .{
+ .port = this.config.port,
+ .options = 0,
+ });
+ } else {
+ this.app.listenWithConfig(*ThisServer, this, onListen, .{
+ .port = this.config.port,
+ .host = this.config.hostname,
+ .options = 0,
+ });
+ }
}
};
}
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig
index 2ded4e5cb..6a4cc7469 100644
--- a/src/bun.js/base.zig
+++ b/src/bun.js/base.zig
@@ -2770,7 +2770,6 @@ pub fn castObj(obj: js.JSObjectRef, comptime Type: type) *Type {
const JSNode = @import("../js_ast.zig").Macro.JSNode;
const LazyPropertiesObject = @import("../js_ast.zig").Macro.LazyPropertiesObject;
const ModuleNamespace = @import("../js_ast.zig").Macro.ModuleNamespace;
-const FetchTaskletContext = Fetch.FetchTasklet.FetchTaskletContext;
const Expect = Test.Expect;
const DescribeScope = Test.DescribeScope;
const TestScope = Test.TestScope;
@@ -2824,7 +2823,6 @@ pub const JSPrivateDataPtr = TaggedPointerUnion(.{
Expect,
ExpectPrototype,
FetchEvent,
- FetchTaskletContext,
HTMLRewriter,
JSNode,
LazyPropertiesObject,
@@ -3497,25 +3495,26 @@ pub fn wrapWithHasContainer(
}
if (comptime maybe_async) {
- var vm = ctx.ptr().bunVM();
- vm.tick();
-
- var promise = JSC.JSInternalPromise.resolvedPromise(ctx.ptr(), result);
-
- switch (promise.status(ctx.ptr().vm())) {
- JSC.JSPromise.Status.Pending => {
- while (promise.status(ctx.ptr().vm()) == .Pending) {
- vm.tick();
- }
- result = promise.result(ctx.ptr().vm());
- },
- JSC.JSPromise.Status.Rejected => {
- result = promise.result(ctx.ptr().vm());
- exception.* = result.asObjectRef();
- },
- JSC.JSPromise.Status.Fulfilled => {
- result = promise.result(ctx.ptr().vm());
- },
+ if (result.asPromise() != null or result.asInternalPromise() != null) {
+ var vm = ctx.ptr().bunVM();
+ vm.tick();
+ var promise = JSC.JSInternalPromise.resolvedPromise(ctx.ptr(), result);
+
+ switch (promise.status(ctx.ptr().vm())) {
+ JSC.JSPromise.Status.Pending => {
+ while (promise.status(ctx.ptr().vm()) == .Pending) {
+ vm.tick();
+ }
+ result = promise.result(ctx.ptr().vm());
+ },
+ JSC.JSPromise.Status.Rejected => {
+ result = promise.result(ctx.ptr().vm());
+ exception.* = result.asObjectRef();
+ },
+ JSC.JSPromise.Status.Fulfilled => {
+ result = promise.result(ctx.ptr().vm());
+ },
+ }
}
}
diff --git a/src/bun.js/bindings/JSBuffer.cpp b/src/bun.js/bindings/JSBuffer.cpp
index 4b509f257..a1b9a5d40 100644
--- a/src/bun.js/bindings/JSBuffer.cpp
+++ b/src/bun.js/bindings/JSBuffer.cpp
@@ -248,8 +248,8 @@ static EncodedJSValue constructFromEncoding(JSGlobalObject* lexicalGlobalObject,
result = Bun__encoding__constructFromLatin1(lexicalGlobalObject, view.characters8(), view.length(), static_cast<uint8_t>(encoding));
break;
}
- case WebCore::BufferEncodingType::ascii: // ascii is a noop for latin1
- case WebCore::BufferEncodingType::latin1: { // The native encoding is latin1, so we don't need to do any conversion.
+ case WebCore::BufferEncodingType::ascii: // ascii is a noop for latin1
+ case WebCore::BufferEncodingType::latin1: { // The native encoding is latin1, so we don't need to do any conversion.
result = JSBuffer__bufferFromPointerAndLength(lexicalGlobalObject, view.characters8(), view.length());
break;
}
@@ -1216,7 +1216,7 @@ static inline JSC::EncodedJSValue jsBufferPrototypeFunction_writeBody(JSC::JSGlo
if (callFrame->argumentCount() > 2) {
uint32_t arg_len = 0;
arg_len = callFrame->argument(2).toUInt32(lexicalGlobalObject);
- length = std::min(arg_len, length-offset);
+ length = std::min(arg_len, length - offset);
}
if (callFrame->argumentCount() > 2) {
@@ -1329,7 +1329,6 @@ template<> EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSBufferConstructor::construc
JSC::JSObject* constructor = lexicalGlobalObject->m_typedArrayUint8.constructor(lexicalGlobalObject);
- // TODO: avoid this copy
MarkedArgumentBuffer args;
for (size_t i = 0; i < argsCount; ++i)
args.append(callFrame->uncheckedArgument(i));
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 51d55d2e0..a68376872 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -33,6 +33,7 @@ pub fn ConcurrentPromiseTask(comptime Context: type) type {
allocator: std.mem.Allocator,
promise: JSValue,
globalThis: *JSGlobalObject,
+ concurrent_task: JSC.ConcurrentTask = .{},
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
@@ -75,68 +76,7 @@ pub fn ConcurrentPromiseTask(comptime Context: type) type {
}
pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
- }
-
- pub fn deinit(this: *This) void {
- this.allocator.destroy(this);
- }
- };
-}
-
-pub fn SerialPromiseTask(comptime Context: type) type {
- return struct {
- const SerialWorkPool = @import("../work_pool.zig").NewWorkPool(1);
- const This = @This();
-
- ctx: *Context,
- task: WorkPoolTask = .{ .callback = runFromThreadPool },
- event_loop: *JSC.EventLoop,
- allocator: std.mem.Allocator,
- promise: JSValue,
- globalThis: *JSGlobalObject,
-
- pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
- var this = try allocator.create(This);
- this.* = .{
- .event_loop = VirtualMachine.vm.event_loop,
- .ctx = value,
- .allocator = allocator,
- .promise = JSValue.createInternalPromise(globalThis),
- .globalThis = globalThis,
- };
- js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef());
- VirtualMachine.vm.active_tasks +|= 1;
- return this;
- }
-
- pub fn runFromThreadPool(task: *WorkPoolTask) void {
- var this = @fieldParentPtr(This, "task", task);
- Context.run(this.ctx);
- this.onFinish();
- }
-
- pub fn runFromJS(this: This) void {
- var promise_value = this.promise;
- var promise = promise_value.asInternalPromise() orelse {
- if (comptime @hasDecl(Context, "deinit")) {
- @call(.{}, Context.deinit, .{this.ctx});
- }
- return;
- };
-
- var ctx = this.ctx;
-
- js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef());
- ctx.then(promise, this.globalThis);
- }
-
- pub fn schedule(this: *This) void {
- SerialWorkPool.schedule(&this.task);
- }
-
- pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
+ this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this));
}
pub fn deinit(this: *This) void {
@@ -153,6 +93,7 @@ pub fn IOTask(comptime Context: type) type {
event_loop: *JSC.EventLoop,
allocator: std.mem.Allocator,
globalThis: *JSGlobalObject,
+ concurrent_task: ConcurrentTask = .{},
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
@@ -182,53 +123,7 @@ pub fn IOTask(comptime Context: type) type {
}
pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
- }
-
- pub fn deinit(this: *This) void {
- var allocator = this.allocator;
- this.* = undefined;
- allocator.destroy(this);
- }
- };
-}
-
-pub fn AsyncNativeCallbackTask(comptime Context: type) type {
- return struct {
- const This = @This();
- ctx: *Context,
- task: WorkPoolTask = .{ .callback = runFromThreadPool },
- event_loop: *JSC.EventLoop,
- allocator: std.mem.Allocator,
- globalThis: *JSGlobalObject,
-
- pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
- var this = try allocator.create(This);
- this.* = .{
- .event_loop = VirtualMachine.vm.eventLoop(),
- .ctx = value,
- .allocator = allocator,
- .globalThis = globalThis,
- };
- VirtualMachine.vm.active_tasks +|= 1;
- return this;
- }
-
- pub fn runFromThreadPool(task: *WorkPoolTask) void {
- var this = @fieldParentPtr(This, "task", task);
- Context.run(this.ctx, this);
- }
-
- pub fn runFromJS(this: This) void {
- this.ctx.runFromJS(this.globalThis);
- }
-
- pub fn schedule(this: *This) void {
- WorkPool.get().schedule(WorkPool.schedule(&this.task));
- }
-
- pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
+ this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this));
}
pub fn deinit(this: *This) void {
@@ -290,95 +185,97 @@ pub const Task = TaggedPointerUnion(.{
// PromiseTask,
// TimeoutTasklet,
});
+const UnboundedQueue = @import("./unbounded_queue.zig").UnboundedQueue;
+pub const ConcurrentTask = struct {
+ task: Task = undefined,
+ next: ?*ConcurrentTask = null,
+
+ pub const Queue = UnboundedQueue(ConcurrentTask, .next);
+
+ pub fn from(this: *ConcurrentTask, of: anytype) *ConcurrentTask {
+ this.* = .{
+ .task = Task.init(of),
+ .next = null,
+ };
+ return this;
+ }
+};
const AsyncIO = @import("io");
pub const EventLoop = struct {
- ready_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
- pending_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
- io_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
tasks: Queue = undefined,
- concurrent_tasks: Queue = undefined,
- concurrent_lock: Lock = Lock.init(),
+ concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{},
global: *JSGlobalObject = undefined,
virtual_machine: *VirtualMachine = undefined,
waker: ?AsyncIO.Waker = null,
-
+ defer_count: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
pub fn tickWithCount(this: *EventLoop) u32 {
- var finished: u32 = 0;
var global = this.global;
var global_vm = global.vm();
var vm_ = this.virtual_machine;
+ var counter: usize = 0;
while (this.tasks.readItem()) |task| {
+ defer counter += 1;
switch (task.tag()) {
.Microtask => {
var micro: *Microtask = task.as(Microtask);
micro.run(global);
- finished += 1;
},
.MicrotaskForDefaultGlobalObject => {
var micro: *MicrotaskForDefaultGlobalObject = task.as(MicrotaskForDefaultGlobalObject);
micro.run(global);
- finished += 1;
},
.FetchTasklet => {
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
fetch_task.onDone();
- finished += 1;
+ fetch_task.deinit();
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(AsyncTransformTask)) => {
var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(CopyFilePromiseTask)) => {
var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, typeBaseName(@typeName(JSC.napi.napi_async_work))) => {
var transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?;
transform_task.*.runFromJS();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(BunTimerTimeoutTask)) => {
var transform_task: *BunTimerTimeoutTask = task.get(BunTimerTimeoutTask).?;
transform_task.*.runFromJS();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(ReadFileTask)) => {
var transform_task: *ReadFileTask = task.get(ReadFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(WriteFileTask)) => {
var transform_task: *WriteFileTask = task.get(WriteFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, typeBaseName(@typeName(AnyTask))) => {
var any: *AnyTask = task.get(AnyTask).?;
any.run();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, typeBaseName(@typeName(CppTask))) => {
var any: *CppTask = task.get(CppTask).?;
any.run(global);
- finished += 1;
vm_.active_tasks -|= 1;
},
else => if (Environment.allow_assert) {
@@ -390,30 +287,39 @@ pub const EventLoop = struct {
global_vm.drainMicrotasks();
}
- if (finished > 0) {
- _ = this.pending_tasks_count.fetchSub(finished, .Monotonic);
+ if (this.tasks.count == 0) {
+ this.tasks.head = 0;
}
- return finished;
+ return @truncate(u32, counter);
}
pub fn tickConcurrent(this: *EventLoop) void {
- if (this.ready_tasks_count.load(.Monotonic) > 0) {
- this.concurrent_lock.lock();
- defer this.concurrent_lock.unlock();
- const add: u32 = @truncate(u32, this.concurrent_tasks.readableLength());
+ _ = this.tickConcurrentWithCount();
+ }
- // TODO: optimzie
- this.tasks.ensureUnusedCapacity(add) catch unreachable;
+ pub fn tickConcurrentWithCount(this: *EventLoop) usize {
+ var concurrent = this.concurrent_tasks.popBatch();
+ const count = concurrent.count;
+ if (count == 0)
+ return 0;
- {
- this.tasks.writeAssumeCapacity(this.concurrent_tasks.readableSlice(0));
- this.concurrent_tasks.discard(this.concurrent_tasks.count);
- }
+ var iter = concurrent.iterator();
+ const start_count = this.tasks.count;
+ if (start_count == 0) {
+ this.tasks.head = 0;
+ }
- _ = this.pending_tasks_count.fetchAdd(add, .Monotonic);
- _ = this.ready_tasks_count.fetchSub(add, .Monotonic);
+ this.tasks.ensureUnusedCapacity(count) catch unreachable;
+ var writable = this.tasks.writableSlice(0);
+ while (iter.next()) |task| {
+ writable[0] = task.task;
+ writable = writable[1..];
+ this.tasks.count += 1;
+ if (writable.len == 0) break;
}
+
+ return this.tasks.count - start_count;
}
// TODO: fix this technical debt
@@ -423,7 +329,9 @@ pub const EventLoop = struct {
this.tickConcurrent();
var global_vm = ctx.global.vm();
while (true) {
- while (this.tickWithCount() > 0) {} else {
+ while (this.tickWithCount() > 0) {
+ this.tickConcurrent();
+ } else {
global_vm.releaseWeakRefs();
global_vm.drainMicrotasks();
this.tickConcurrent();
@@ -436,13 +344,28 @@ pub const EventLoop = struct {
break;
}
- if (!ctx.disable_run_us_loop and ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) {
+ this.global.handleRejectedPromises();
+ }
+
+ pub fn runUSocketsLoop(this: *EventLoop) void {
+ var ctx = this.virtual_machine;
+
+ ctx.global.vm().releaseWeakRefs();
+ ctx.global.vm().drainMicrotasks();
+
+ if (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) {
+ if (this.tickConcurrentWithCount() > 0) {
+ this.tick();
+ } else if (ctx.uws_event_loop.?.num_polls > 0) {
+ if ((@intCast(c_ulonglong, ctx.uws_event_loop.?.internal_loop_data.iteration_nr) % 1_000) == 1) {
+ _ = ctx.global.vm().runGC(true);
+ }
+ }
+
ctx.is_us_loop_entered = true;
ctx.enterUWSLoop();
ctx.is_us_loop_entered = false;
}
-
- this.global.handleRejectedPromises();
}
// TODO: fix this technical debt
@@ -451,6 +374,10 @@ pub const EventLoop = struct {
JSC.JSPromise.Status.Pending => {
while (promise.status(this.global.vm()) == .Pending) {
this.tick();
+
+ if (this.virtual_machine.uws_event_loop != null) {
+ this.runUSocketsLoop();
+ }
}
},
else => {},
@@ -459,13 +386,20 @@ pub const EventLoop = struct {
pub fn waitForTasks(this: *EventLoop) void {
this.tick();
- while (this.pending_tasks_count.load(.Monotonic) > 0) {
+ while (this.tasks.count > 0) {
this.tick();
+
+ if (this.virtual_machine.uws_event_loop != null) {
+ this.runUSocketsLoop();
+ }
+ } else {
+ if (this.virtual_machine.uws_event_loop != null) {
+ this.runUSocketsLoop();
+ }
}
}
pub fn enqueueTask(this: *EventLoop, task: Task) void {
- _ = this.pending_tasks_count.fetchAdd(1, .Monotonic);
this.tasks.writeItem(task) catch unreachable;
}
@@ -476,19 +410,25 @@ pub const EventLoop = struct {
}
}
- pub fn enqueueTaskConcurrent(this: *EventLoop, task: Task) void {
+ pub fn onDefer(this: *EventLoop) void {
+ this.defer_count.store(0, .Monotonic);
+ this.tick();
+ }
+
+ pub fn enqueueTaskConcurrent(this: *EventLoop, task: *ConcurrentTask) void {
JSC.markBinding();
- this.concurrent_lock.lock();
- defer this.concurrent_lock.unlock();
- this.concurrent_tasks.writeItem(task) catch unreachable;
+
+ this.concurrent_tasks.push(task);
+
if (this.virtual_machine.uws_event_loop) |loop| {
- loop.nextTick(*EventLoop, this, EventLoop.tick);
+ const deferCount = this.defer_count.fetchAdd(1, .Monotonic);
+ if (deferCount == 0) {
+ loop.nextTick(*EventLoop, this, onDefer);
+ }
}
- if (this.ready_tasks_count.fetchAdd(1, .Monotonic) == 0) {
- if (this.waker) |*waker| {
- waker.wake() catch unreachable;
- }
+ if (this.waker) |*waker| {
+ waker.wake() catch unreachable;
}
}
};
diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig
index 5eeff1ba8..4f97a79ad 100644
--- a/src/bun.js/javascript.zig
+++ b/src/bun.js/javascript.zig
@@ -368,7 +368,6 @@ pub const VirtualMachine = struct {
rare_data: ?*JSC.RareData = null,
poller: JSC.Poller = JSC.Poller{},
us_loop_reference_count: usize = 0,
- disable_run_us_loop: bool = false,
is_us_loop_entered: bool = false,
pub fn io(this: *VirtualMachine) *IO {
@@ -423,7 +422,7 @@ pub const VirtualMachine = struct {
this.eventLoop().enqueueTask(task);
}
- pub inline fn enqueueTaskConcurrent(this: *VirtualMachine, task: Task) void {
+ pub inline fn enqueueTaskConcurrent(this: *VirtualMachine, task: JSC.ConcurrentTask) void {
this.eventLoop().enqueueTaskConcurrent(task);
}
@@ -451,7 +450,7 @@ pub const VirtualMachine = struct {
this.macro_event_loop.tasks.ensureTotalCapacity(16) catch unreachable;
this.macro_event_loop.global = this.global;
this.macro_event_loop.virtual_machine = this;
- this.macro_event_loop.concurrent_tasks = EventLoop.Queue.init(default_allocator);
+ this.macro_event_loop.concurrent_tasks = .{};
}
this.bundler.options.platform = .bun_macro;
@@ -555,8 +554,7 @@ pub const VirtualMachine = struct {
default_allocator,
);
VirtualMachine.vm.regular_event_loop.tasks.ensureUnusedCapacity(64) catch unreachable;
- VirtualMachine.vm.regular_event_loop.concurrent_tasks = EventLoop.Queue.init(default_allocator);
- VirtualMachine.vm.regular_event_loop.concurrent_tasks.ensureUnusedCapacity(8) catch unreachable;
+ VirtualMachine.vm.regular_event_loop.concurrent_tasks = .{};
VirtualMachine.vm.event_loop = &VirtualMachine.vm.regular_event_loop;
vm.bundler.macro_context = null;
@@ -1435,6 +1433,7 @@ pub const VirtualMachine = struct {
pub fn loadEntryPoint(this: *VirtualMachine, entry_path: string) !*JSInternalPromise {
this.main = entry_path;
try this.entry_point.generate(@TypeOf(this.bundler), &this.bundler, Fs.PathName.init(entry_path), main_file_name);
+ this.eventLoop().ensureWaker();
var promise: *JSInternalPromise = undefined;
@@ -1455,7 +1454,15 @@ pub const VirtualMachine = struct {
promise = JSModuleLoader.loadAndEvaluateModule(this.global, &ZigString.init(this.main));
}
- this.waitForPromise(promise);
+ while (promise.status(this.global.vm()) == .Pending) {
+ this.eventLoop().tick();
+ _ = this.eventLoop().waker.?.wait() catch 0;
+ }
+
+ if (this.us_loop_reference_count > 0) {
+ _ = this.global.vm().runGC(true);
+ this.eventLoop().runUSocketsLoop();
+ }
return promise;
}
diff --git a/src/bun.js/unbounded_queue.zig b/src/bun.js/unbounded_queue.zig
new file mode 100644
index 000000000..fd092290d
--- /dev/null
+++ b/src/bun.js/unbounded_queue.zig
@@ -0,0 +1,149 @@
+const std = @import("std");
+
+const os = std.os;
+const mem = std.mem;
+const meta = std.meta;
+const atomic = std.atomic;
+const builtin = std.builtin;
+const testing = std.testing;
+
+const assert = std.debug.assert;
+
+const mpsc = @This();
+
+pub const cache_line_length = switch (@import("builtin").target.cpu.arch) {
+ .x86_64, .aarch64, .powerpc64 => 128,
+ .arm, .mips, .mips64, .riscv64 => 32,
+ .s390x => 256,
+ else => 64,
+};
+
+pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) type {
+ const next = meta.fieldInfo(T, next_field).name;
+
+ return struct {
+ const Self = @This();
+
+ pub const Batch = struct {
+ pub const Iterator = struct {
+ batch: Self.Batch,
+
+ pub fn next(self: *Self.Batch.Iterator) ?*T {
+ if (self.batch.count == 0) return null;
+ const front = self.batch.front orelse unreachable;
+ self.batch.front = @field(front, next);
+ self.batch.count -= 1;
+ return front;
+ }
+ };
+
+ front: ?*T = null,
+ last: ?*T = null,
+ count: usize = 0,
+
+ pub fn iterator(self: Self.Batch) Self.Batch.Iterator {
+ return .{ .batch = self };
+ }
+ };
+
+ pub const queue_padding_length = cache_line_length / 2;
+
+ back: ?*T align(queue_padding_length) = null,
+ count: usize = 0,
+ front: T align(queue_padding_length) = init: {
+ var stub: T = undefined;
+ @field(stub, next) = null;
+ break :init stub;
+ },
+
+ pub fn push(self: *Self, src: *T) void {
+ assert(@atomicRmw(usize, &self.count, .Add, 1, .Release) >= 0);
+
+ @field(src, next) = null;
+ const old_back = @atomicRmw(?*T, &self.back, .Xchg, src, .AcqRel) orelse &self.front;
+ @field(old_back, next) = src;
+ }
+
+ pub fn pushBatch(self: *Self, first: *T, last: *T, count: usize) void {
+ assert(@atomicRmw(usize, &self.count, .Add, count, .Release) >= 0);
+
+ @field(last, next) = null;
+ const old_back = @atomicRmw(?*T, &self.back, .Xchg, last, .AcqRel) orelse &self.front;
+ @field(old_back, next) = first;
+ }
+
+ pub fn pop(self: *Self) ?*T {
+ const first = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return null;
+ if (@atomicLoad(?*T, &@field(first, next), .Acquire)) |next_item| {
+ @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
+ assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
+ return first;
+ }
+ const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front;
+ if (first != last) return null;
+ @atomicStore(?*T, &@field(self.front, next), null, .Monotonic);
+ if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) {
+ assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
+ return first;
+ }
+ var next_item = @atomicLoad(?*T, &@field(first, next), .Acquire);
+ while (next_item == null) : (atomic.spinLoopHint()) {
+ next_item = @atomicLoad(?*T, &@field(first, next), .Acquire);
+ }
+ @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
+ assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
+ return first;
+ }
+
+ pub fn popBatch(self: *Self) Self.Batch {
+ var batch: Self.Batch = .{};
+
+ var front = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return batch;
+ batch.front = front;
+
+ var next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
+ while (next_item) |next_node| : (next_item = @atomicLoad(?*T, &@field(next_node, next), .Acquire)) {
+ batch.count += 1;
+ batch.last = front;
+
+ front = next_node;
+ }
+
+ const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front;
+ if (front != last) {
+ @atomicStore(?*T, &@field(self.front, next), front, .Release);
+ assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
+ return batch;
+ }
+
+ @atomicStore(?*T, &@field(self.front, next), null, .Monotonic);
+ if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) {
+ batch.count += 1;
+ batch.last = front;
+ assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
+ return batch;
+ }
+
+ next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
+ while (next_item == null) : (atomic.spinLoopHint()) {
+ next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
+ }
+
+ batch.count += 1;
+ @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
+ batch.last = front;
+ assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
+ return batch;
+ }
+
+ pub fn peek(self: *Self) usize {
+ const count = @atomicLoad(usize, &self.count, .Acquire);
+ assert(count >= 0);
+ return count;
+ }
+
+ pub fn isEmpty(self: *Self) bool {
+ return self.peek() == 0;
+ }
+ };
+}
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 6c1fc49f9..2ef8225f3 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -515,90 +515,79 @@ pub const Fetch = struct {
);
pub const FetchTasklet = struct {
- promise: *JSPromise = undefined,
- http: HTTPClient.AsyncHTTP = undefined,
- result: HTTPClient.HTTPClientResult = undefined,
- status: Status = Status.pending,
+ http: ?*HTTPClient.AsyncHTTP = null,
+ result: HTTPClient.HTTPClientResult = .{},
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
-
- empty_request_body: MutableString = undefined,
-
- context: FetchTaskletContext = undefined,
+ request_body: Blob = undefined,
response_buffer: MutableString = undefined,
-
- blob_store: ?*Blob.Store = null,
-
- const Pool = ObjectPool(FetchTasklet, init, true, 32);
- const BodyPool = ObjectPool(MutableString, MutableString.init2048, true, 8);
- pub const FetchTaskletContext = struct {
- tasklet: *FetchTasklet,
- };
+ request_headers: Headers = Headers{ .allocator = undefined },
+ ref: *JSC.napi.Ref = undefined,
+ concurrent_task: JSC.ConcurrentTask = .{},
pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet {
return FetchTasklet{};
}
- pub const Status = enum(u8) {
- pending,
- running,
- done,
- };
+ fn clearData(this: *FetchTasklet) void {
+ this.request_headers.entries.deinit(bun.default_allocator);
+ this.request_headers.buf.deinit(bun.default_allocator);
+ this.request_headers = Headers{ .allocator = undefined };
+ this.http.?.deinit();
+
+ this.result.deinitMetadata();
+ this.response_buffer.deinit();
+ this.request_body.detach();
+ }
+
+ pub fn deinit(this: *FetchTasklet) void {
+ if (this.http) |http| this.javascript_vm.allocator.destroy(http);
+ this.javascript_vm.allocator.destroy(this);
+ }
pub fn onDone(this: *FetchTasklet) void {
if (comptime JSC.is_bindgen)
unreachable;
const globalThis = this.global_this;
- const promise = this.promise;
- const state = this.http.state.load(.Monotonic);
- const result = switch (state) {
- .success => this.onResolve(),
- .fail => this.onReject(),
- else => unreachable,
+
+ var ref = this.ref;
+ const promise_value = ref.get(globalThis);
+ defer ref.destroy(globalThis);
+
+ if (promise_value.isEmptyOrUndefinedOrNull()) {
+ this.clearData();
+ return;
+ }
+
+ var promise = promise_value.asPromise().?;
+
+ const success = this.result.isSuccess();
+ const result = switch (success) {
+ true => this.onResolve(),
+ false => this.onReject(),
};
- this.release();
- const promise_value = promise.asValue(globalThis);
- promise_value.unprotect();
+ this.clearData();
- switch (state) {
- .success => {
+ promise_value.ensureStillAlive();
+
+ switch (success) {
+ true => {
promise.resolve(globalThis, result);
},
- .fail => {
+ false => {
promise.reject(globalThis, result);
},
- else => unreachable,
}
}
- pub fn reset(_: *FetchTasklet) void {}
-
- pub fn release(this: *FetchTasklet) void {
- this.global_this = undefined;
- this.javascript_vm = undefined;
- this.promise = undefined;
- this.status = Status.pending;
- // var pooled = this.pooled_body;
- // BodyPool.release(pooled);
- // this.pooled_body = undefined;
- this.http = undefined;
-
- Pool.release(@fieldParentPtr(Pool.Node, "data", this));
- }
-
pub fn onReject(this: *FetchTasklet) JSValue {
- if (this.blob_store) |store| {
- this.blob_store = null;
- store.deref();
- }
- defer this.result.deinitMetadata();
const fetch_error = std.fmt.allocPrint(
default_allocator,
"fetch() failed {s}\nurl: \"{s}\"",
.{
- @errorName(this.http.err orelse error.HTTPFail),
+ this.result.fail,
this.result.href,
},
) catch unreachable;
@@ -606,26 +595,27 @@ pub const Fetch = struct {
}
pub fn onResolve(this: *FetchTasklet) JSValue {
- var allocator = default_allocator;
- var http_response = this.http.response.?;
+ var allocator = this.global_this.bunVM().allocator;
+ const http_response = this.result.response;
var response = allocator.create(Response) catch unreachable;
- if (this.blob_store) |store| {
- this.blob_store = null;
- store.deref();
- }
- defer this.result.deinitMetadata();
+ const blob = Blob.init(this.response_buffer.toOwnedSliceLeaky(), allocator, this.global_this);
+ this.response_buffer = .{ .allocator = default_allocator, .list = .{
+ .items = &.{},
+ .capacity = 0,
+ } };
+
response.* = Response{
.allocator = allocator,
.url = allocator.dupe(u8, this.result.href) catch unreachable,
.status_text = allocator.dupe(u8, http_response.status) catch unreachable,
- .redirected = this.http.redirected,
+ .redirected = this.result.redirected,
.body = .{
.init = .{
.headers = FetchHeaders.createFromPicoHeaders(this.global_this, http_response.headers),
.status_code = @truncate(u16, http_response.status_code),
},
.value = .{
- .Blob = Blob.init(this.http.response_buffer.toOwnedSliceLeaky(), allocator, this.global_this),
+ .Blob = blob,
},
},
};
@@ -636,38 +626,50 @@ pub const Fetch = struct {
allocator: std.mem.Allocator,
method: Method,
url: ZigURL,
- headers: Headers.Entries,
- headers_buf: string,
- request_body: ?*MutableString,
+ headers: Headers,
+ request_body: Blob,
timeout: usize,
- request_body_store: ?*Blob.Store,
- ) !*FetchTasklet.Pool.Node {
- var linked_list = FetchTasklet.Pool.get(allocator);
- linked_list.data.javascript_vm = VirtualMachine.vm;
- linked_list.data.empty_request_body = MutableString.init(allocator, 0) catch unreachable;
- // linked_list.data.pooled_body = BodyPool.get(allocator);
- linked_list.data.blob_store = request_body_store;
- linked_list.data.response_buffer = MutableString.initEmpty(allocator);
- linked_list.data.http = HTTPClient.AsyncHTTP.init(
+ globalThis: *JSC.JSGlobalObject,
+ promise: JSValue,
+ ) !*FetchTasklet {
+ var jsc_vm = globalThis.bunVM();
+ var fetch_tasklet = try jsc_vm.allocator.create(FetchTasklet);
+ if (request_body.store) |store| {
+ store.ref();
+ }
+
+ fetch_tasklet.* = .{
+ .response_buffer = MutableString{
+ .allocator = bun.default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ },
+ .http = try jsc_vm.allocator.create(HTTPClient.AsyncHTTP),
+ .javascript_vm = jsc_vm,
+ .request_body = request_body,
+ .global_this = globalThis,
+ .request_headers = headers,
+ .ref = JSC.napi.Ref.create(globalThis, promise),
+ };
+ fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(
allocator,
method,
url,
- headers,
- headers_buf,
- &linked_list.data.response_buffer,
- request_body orelse &linked_list.data.empty_request_body,
+ headers.entries,
+ headers.buf.items,
+ &fetch_tasklet.response_buffer,
+ request_body.sharedView(),
timeout,
- undefined,
- );
- linked_list.data.context = .{ .tasklet = &linked_list.data };
- linked_list.data.http.completion_callback = HTTPClient.HTTPClientResult.Callback.New(
- *FetchTasklet,
- FetchTasklet.callback,
- ).init(
- &linked_list.data,
+ HTTPClient.HTTPClientResult.Callback.New(
+ *FetchTasklet,
+ FetchTasklet.callback,
+ ).init(
+ fetch_tasklet,
+ ),
);
-
- return linked_list;
+ return fetch_tasklet;
}
pub fn queue(
@@ -675,27 +677,36 @@ pub const Fetch = struct {
global: *JSGlobalObject,
method: Method,
url: ZigURL,
- headers: Headers.Entries,
- headers_buf: string,
- request_body: ?*MutableString,
+ headers: Headers,
+ request_body: Blob,
timeout: usize,
- request_body_store: ?*Blob.Store,
- ) !*FetchTasklet.Pool.Node {
+ promise: JSValue,
+ ) !*FetchTasklet {
try HTTPClient.HTTPThread.init();
- var node = try get(allocator, method, url, headers, headers_buf, request_body, timeout, request_body_store);
+ var node = try get(
+ allocator,
+ method,
+ url,
+ headers,
+ request_body,
+ timeout,
+ global,
+ promise,
+ );
- node.data.global_this = global;
var batch = NetworkThread.Batch{};
- node.data.http.schedule(allocator, &batch);
- HTTPClient.http_thread.schedule(batch);
+ node.http.?.schedule(allocator, &batch);
VirtualMachine.vm.active_tasks +|= 1;
+
+ HTTPClient.http_thread.schedule(batch);
+
return node;
}
pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void {
task.response_buffer = result.body.?.*;
task.result = result;
- task.javascript_vm.eventLoop().enqueueTaskConcurrent(Task.init(task));
+ task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task));
}
};
@@ -715,12 +726,11 @@ pub const Fetch = struct {
}
var headers: ?Headers = null;
- var body: MutableString = MutableString.initEmpty(bun.default_allocator);
var method = Method.GET;
var args = JSC.Node.ArgumentsSlice.from(ctx.bunVM(), arguments);
var url: ZigURL = undefined;
var first_arg = args.nextEat().?;
- var blob_store: ?*Blob.Store = null;
+ var body: Blob = Blob.initEmpty(ctx);
if (first_arg.isString()) {
var url_zig_str = ZigString.init("");
JSValue.fromRef(arguments[0]).toZigString(&url_zig_str, globalThis);
@@ -737,7 +747,6 @@ pub const Fetch = struct {
url_str = getAllocator(ctx).dupe(u8, url_str) catch unreachable;
}
- NetworkThread.init() catch @panic("Failed to start network thread");
url = ZigURL.parse(url_str);
if (arguments.len >= 2 and arguments[1].?.value().isObject()) {
@@ -760,18 +769,7 @@ pub const Fetch = struct {
if (options.fastGet(ctx.ptr(), .body)) |body__| {
if (Blob.fromJS(ctx.ptr(), body__, true, false)) |new_blob| {
- if (new_blob.size > 0) {
- body = MutableString{
- .list = std.ArrayListUnmanaged(u8){
- .items = bun.constStrToU8(new_blob.sharedView()),
- .capacity = new_blob.size,
- },
- .allocator = bun.default_allocator,
- };
- blob_store = new_blob.store;
- }
- // transfer is unnecessary here because this is a new slice
- //new_blob.transfer();
+ body = new_blob;
} else |_| {
return JSPromise.rejectedPromiseValue(globalThis, ZigString.init("fetch() received invalid body").toErrorInstance(globalThis)).asRef();
}
@@ -783,54 +781,28 @@ pub const Fetch = struct {
if (request.headers) |head| {
headers = Headers.from(head, bun.default_allocator) catch unreachable;
}
- var blob = request.body.use();
- // TODO: make RequestBody _NOT_ a MutableString
- body = MutableString{
- .list = std.ArrayListUnmanaged(u8){
- .items = bun.constStrToU8(blob.sharedView()),
- .capacity = bun.constStrToU8(blob.sharedView()).len,
- },
- .allocator = blob.allocator orelse bun.default_allocator,
- };
- blob_store = blob.store;
+ body = request.body.use();
} else {
const fetch_error = fetch_type_error_strings.get(js.JSValueGetType(ctx, arguments[0]));
return JSPromise.rejectedPromiseValue(globalThis, ZigString.init(fetch_error).toErrorInstance(globalThis)).asRef();
}
- var header_entries: Headers.Entries = .{};
- var header_buf: string = "";
-
- if (headers) |head| {
- header_entries = head.entries;
- header_buf = head.buf.items;
- }
-
- var request_body: ?*MutableString = null;
- if (body.list.items.len > 0) {
- var mutable = bun.default_allocator.create(MutableString) catch unreachable;
- mutable.* = body;
- request_body = mutable;
- }
+ var deferred_promise = JSC.C.JSObjectMakeDeferredPromise(globalThis, null, null, null);
// var resolve = FetchTasklet.FetchResolver.Class.make(ctx: js.JSContextRef, ptr: *ZigType)
- var queued = FetchTasklet.queue(
+ _ = FetchTasklet.queue(
default_allocator,
globalThis,
method,
url,
- header_entries,
- header_buf,
- request_body,
+ headers orelse Headers{
+ .allocator = bun.default_allocator,
+ },
+ body,
std.time.ns_per_hour,
- blob_store,
+ JSC.JSValue.fromRef(deferred_promise),
) catch unreachable;
- const promise = JSC.JSPromise.create(ctx);
- queued.data.promise = promise;
- const promise_value = promise.asValue(ctx);
- promise_value.protect();
-
- return promise_value.asObjectRef();
+ return deferred_promise;
}
};
@@ -1991,7 +1963,7 @@ pub const Blob = struct {
}
}
pub fn run(this: *ReadFile, task: *ReadFileTask) void {
- var frame = HTTPClient.getAllocator().create(@Frame(runAsync)) catch unreachable;
+ var frame = bun.default_allocator.create(@Frame(runAsync)) catch unreachable;
_ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{ this, task });
}
@@ -2020,7 +1992,7 @@ pub const Blob = struct {
task.onFinish();
suspend {
- HTTPClient.getAllocator().destroy(@frame());
+ bun.default_allocator.destroy(@frame());
}
}
@@ -2236,7 +2208,7 @@ pub const Blob = struct {
cb(cb_ctx, .{ .result = @truncate(SizeType, wrote) });
}
pub fn run(this: *WriteFile, task: *WriteFileTask) void {
- var frame = HTTPClient.getAllocator().create(@Frame(runAsync)) catch unreachable;
+ var frame = bun.default_allocator.create(@Frame(runAsync)) catch unreachable;
_ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{ this, task });
}
@@ -2244,7 +2216,7 @@ pub const Blob = struct {
this._runAsync();
task.onFinish();
suspend {
- HTTPClient.getAllocator().destroy(@frame());
+ bun.default_allocator.destroy(@frame());
}
}
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 242e3db49..1158a8dd2 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -2340,6 +2340,7 @@ pub const FileBlobLoader = struct {
read_frame: anyframe = undefined,
chunk_size: Blob.SizeType = 0,
main_thread_task: JSC.AnyTask = .{ .callback = onJSThread, .ctx = null },
+ concurrent_task: JSC.ConcurrentTask = .{},
pub fn taskCallback(task: *NetworkThread.Task) void {
var this = @fieldParentPtr(FileBlobLoader, "concurrent", @fieldParentPtr(Concurrent, "task", task));
@@ -2475,7 +2476,7 @@ pub const FileBlobLoader = struct {
pub fn scheduleMainThreadTask(this: *FileBlobLoader) void {
this.concurrent.main_thread_task.ctx = this;
- this.loop.enqueueTaskConcurrent(JSC.Task.init(&this.concurrent.main_thread_task));
+ this.loop.enqueueTaskConcurrent(this.concurrent.concurrent_task.from(&this.concurrent.main_thread_task));
}
fn runAsync(this: *FileBlobLoader) void {
diff --git a/src/bun_js.zig b/src/bun_js.zig
index bb5f458f3..87b618309 100644
--- a/src/bun_js.zig
+++ b/src/bun_js.zig
@@ -31,6 +31,7 @@ const which = @import("which.zig").which;
const VirtualMachine = @import("javascript_core").VirtualMachine;
const JSC = @import("javascript_core");
const AsyncHTTP = @import("http").AsyncHTTP;
+const Arena = @import("./mimalloc_arena.zig").Arena;
const OpaqueWrap = JSC.OpaqueWrap;
@@ -39,6 +40,7 @@ pub const Run = struct {
ctx: Command.Context,
vm: *VirtualMachine,
entry_path: string,
+ arena: Arena = undefined,
pub fn boot(ctx: Command.Context, file: std.fs.File, entry_path: string) !void {
if (comptime JSC.is_bindgen) unreachable;
@@ -46,13 +48,16 @@ pub const Run = struct {
js_ast.Expr.Data.Store.create(default_allocator);
js_ast.Stmt.Data.Store.create(default_allocator);
+ var arena = try Arena.init();
var run = Run{
- .vm = try VirtualMachine.init(ctx.allocator, ctx.args, null, ctx.log, null),
+ .vm = try VirtualMachine.init(arena.allocator(), ctx.args, null, ctx.log, null),
.file = file,
+ .arena = arena,
.ctx = ctx,
.entry_path = entry_path,
};
+ run.vm.arena = &run.arena;
run.vm.argv = ctx.positionals;
@@ -131,19 +136,21 @@ pub const Run = struct {
}
this.vm.global.vm().releaseWeakRefs();
+ _ = this.vm.arena.gc(false);
_ = this.vm.global.vm().runGC(false);
this.vm.tick();
{
var any = false;
- while (this.vm.*.event_loop.pending_tasks_count.loadUnchecked() > 0 or this.vm.active_tasks > 0) {
+ while (this.vm.eventLoop().tasks.count > 0 or this.vm.active_tasks > 0) {
this.vm.tick();
any = true;
if (this.vm.active_tasks > 0) {
- if (this.vm.event_loop.ready_tasks_count.load(.Monotonic) == 0) {
+ if (this.vm.eventLoop().tickConcurrentWithCount() == 0) {
+ _ = this.vm.arena.gc(false);
_ = this.vm.global.vm().runGC(false);
- if (this.vm.event_loop.ready_tasks_count.load(.Monotonic) == 0 and
+ if (this.vm.eventLoop().tickConcurrentWithCount() == 0 and
this.vm.active_tasks > 0)
{
this.vm.event_loop.ensureWaker();
diff --git a/src/cli/create_command.zig b/src/cli/create_command.zig
index ee6266f40..27531e4ad 100644
--- a/src/cli/create_command.zig
+++ b/src/cli/create_command.zig
@@ -1845,8 +1845,6 @@ pub const Example = struct {
var mutable = try ctx.allocator.create(MutableString);
mutable.* = try MutableString.init(ctx.allocator, 8096);
- var request_body = try MutableString.init(ctx.allocator, 0);
-
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
async_http.* = HTTP.AsyncHTTP.initSync(
@@ -1856,7 +1854,7 @@ pub const Example = struct {
header_entries,
headers_buf,
mutable,
- &request_body,
+ "",
60 * std.time.ns_per_min,
);
async_http.client.progress_node = progress;
@@ -1916,12 +1914,20 @@ pub const Example = struct {
var mutable = try ctx.allocator.create(MutableString);
mutable.* = try MutableString.init(ctx.allocator, 2048);
- var request_body = try MutableString.init(ctx.allocator, 0);
url = URL.parse(try std.fmt.bufPrint(&url_buf, "https://registry.npmjs.org/@bun-examples/{s}/latest", .{name}));
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
- async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, url, .{}, "", mutable, &request_body, 60 * std.time.ns_per_min);
+ async_http.* = HTTP.AsyncHTTP.initSync(
+ ctx.allocator,
+ .GET,
+ url,
+ .{},
+ "",
+ mutable,
+ "",
+ 60 * std.time.ns_per_min,
+ );
async_http.client.progress_node = progress;
var response = try async_http.sendSync(true);
@@ -1993,7 +1999,16 @@ pub const Example = struct {
mutable.reset();
// ensure very stable memory address
- async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, URL.parse(tarball_url), .{}, "", mutable, &request_body, 60 * std.time.ns_per_min);
+ async_http.* = HTTP.AsyncHTTP.initSync(
+ ctx.allocator,
+ .GET,
+ URL.parse(tarball_url),
+ .{},
+ "",
+ mutable,
+ "",
+ 60 * std.time.ns_per_min,
+ );
async_http.client.progress_node = progress;
refresher.maybeRefresh();
@@ -2018,7 +2033,6 @@ pub const Example = struct {
url = URL.parse(examples_url);
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
- var request_body = try MutableString.init(ctx.allocator, 0);
var mutable = try ctx.allocator.create(MutableString);
mutable.* = try MutableString.init(ctx.allocator, 2048);
@@ -2029,7 +2043,7 @@ pub const Example = struct {
.{},
"",
mutable,
- &request_body,
+ "",
60 * std.time.ns_per_min,
);
diff --git a/src/cli/test_command.zig b/src/cli/test_command.zig
index 92691fa05..1f3ea95cb 100644
--- a/src/cli/test_command.zig
+++ b/src/cli/test_command.zig
@@ -36,6 +36,7 @@ var path_buf: [bun.MAX_PATH_BYTES]u8 = undefined;
var path_buf2: [bun.MAX_PATH_BYTES]u8 = undefined;
const PathString = bun.PathString;
const is_bindgen = std.meta.globalOption("bindgen", bool) orelse false;
+const HTTPThread = @import("http").HTTPThread;
const JSC = @import("javascript_core");
const Jest = JSC.Jest;
@@ -296,6 +297,7 @@ pub const TestCommand = struct {
};
JSC.C.JSCInitialize();
NetworkThread.init() catch {};
+ HTTPThread.init() catch {};
var reporter = try ctx.allocator.create(CommandLineReporter);
reporter.* = CommandLineReporter{
.jest = TestRunner{
@@ -443,10 +445,6 @@ pub const TestCommand = struct {
Output.flush();
var promise = try vm.loadEntryPoint(resolution.path_pair.primary.text);
- while (promise.status(vm.global.vm()) == .Pending) {
- vm.tick();
- }
-
switch (promise.status(vm.global.vm())) {
.Rejected => {
var result = promise.result(vm.global.vm());
diff --git a/src/cli/upgrade_command.zig b/src/cli/upgrade_command.zig
index 15881e984..912dbe921 100644
--- a/src/cli/upgrade_command.zig
+++ b/src/cli/upgrade_command.zig
@@ -207,7 +207,6 @@ pub const UpgradeCommand = struct {
}
var metadata_body = try MutableString.init(allocator, 2048);
- var request_body = try MutableString.init(allocator, 0);
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = allocator.create(HTTP.AsyncHTTP) catch unreachable;
@@ -218,7 +217,7 @@ pub const UpgradeCommand = struct {
header_entries,
headers_buf,
&metadata_body,
- &request_body,
+ "",
60 * std.time.ns_per_min,
);
if (!silent) async_http.client.progress_node = progress;
@@ -441,7 +440,6 @@ pub const UpgradeCommand = struct {
var async_http = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
var zip_file_buffer = try ctx.allocator.create(MutableString);
zip_file_buffer.* = try MutableString.init(ctx.allocator, @maximum(version.size, 1024));
- var request_buffer = try MutableString.init(ctx.allocator, 0);
async_http.* = HTTP.AsyncHTTP.initSync(
ctx.allocator,
@@ -450,7 +448,7 @@ pub const UpgradeCommand = struct {
.{},
"",
zip_file_buffer,
- &request_buffer,
+ "",
timeout,
);
async_http.client.timeout = timeout;
diff --git a/src/deps/uws.zig b/src/deps/uws.zig
index a2a7da143..103a8dc7c 100644
--- a/src/deps/uws.zig
+++ b/src/deps/uws.zig
@@ -262,11 +262,58 @@ pub const SocketTLS = NewSocketHandler(true);
pub const us_timer_t = opaque {};
pub const us_socket_context_t = opaque {};
-pub const Loop = opaque {
+pub const Loop = extern struct {
+ internal_loop_data: InternalLoopData align(16),
+
+ /// Number of non-fallthrough polls in the loop
+ num_polls: c_int,
+
+ /// Number of ready polls this iteration
+ num_ready_polls: c_int,
+
+ /// Current index in list of ready polls
+ current_ready_poll: c_int,
+
+ /// Loop's own file descriptor
+ fd: c_int,
+
+ /// The list of ready polls
+ ready_polls: [1024]EventType,
+
+ const EventType = if (Environment.isLinux) std.os.linux.epoll_event else if (Environment.isMac) std.os.Kevent;
+
+ pub const InternalLoopData = extern struct {
+ pub const us_internal_async = opaque {};
+
+ sweep_timer: ?*us_timer_t,
+ wakeup_async: ?*us_internal_async,
+ last_write_failed: c_int,
+ head: ?*us_socket_context_t,
+ iterator: ?*us_socket_context_t,
+ recv_buf: [*]u8,
+ ssl_data: ?*anyopaque,
+ pre_cb: ?fn (?*Loop) callconv(.C) void,
+ post_cb: ?fn (?*Loop) callconv(.C) void,
+ closed_head: ?*Socket,
+ low_prio_head: ?*Socket,
+ low_prio_budget: c_int,
+ iteration_nr: c_longlong,
+ };
+
pub fn get() ?*Loop {
return uws_get_loop();
}
+ pub fn create(comptime Handler: anytype) *Loop {
+ return us_create_loop(
+ null,
+ Handler.wakeup,
+ if (@hasDecl(Handler, "pre")) Handler.pre else null,
+ if (@hasDecl(Handler, "post")) Handler.post else null,
+ 0,
+ ).?;
+ }
+
pub fn wakeup(this: *Loop) void {
return us_wakeup_loop(this);
}
@@ -305,6 +352,15 @@ pub const Loop = opaque {
};
}
+ pub fn addPreHandler(this: *Loop, comptime UserType: type, ctx: UserType, comptime callback: fn (UserType) void) NewHandler(UserType, callback) {
+ const Handler = NewHandler(UserType, callback);
+
+ uws_loop_addPreHandler(this, ctx, Handler.callback);
+ return Handler{
+ .loop = this,
+ };
+ }
+
pub fn run(this: *Loop) void {
us_loop_run(this);
}
@@ -312,7 +368,7 @@ pub const Loop = opaque {
extern fn uws_loop_defer(loop: *Loop, ctx: *anyopaque, cb: fn (ctx: *anyopaque) callconv(.C) void) void;
extern fn uws_get_loop() ?*Loop;
- extern fn us_create_loop(hint: ?*anyopaque, wakeup_cb: ?fn (?*Loop) callconv(.C) void, pre_cb: ?fn (?*Loop) callconv(.C) void, post_cb: ?fn (?*Loop) callconv(.C) void, ext_size: c_uint) ?*Loop;
+ extern fn us_create_loop(hint: ?*anyopaque, wakeup_cb: ?fn (*Loop) callconv(.C) void, pre_cb: ?fn (*Loop) callconv(.C) void, post_cb: ?fn (*Loop) callconv(.C) void, ext_size: c_uint) ?*Loop;
extern fn us_loop_free(loop: ?*Loop) void;
extern fn us_loop_ext(loop: ?*Loop) ?*anyopaque;
extern fn us_loop_run(loop: ?*Loop) void;
@@ -322,6 +378,8 @@ pub const Loop = opaque {
extern fn us_loop_iteration_number(loop: ?*Loop) c_longlong;
extern fn uws_loop_addPostHandler(loop: *Loop, ctx: *anyopaque, cb: (fn (ctx: *anyopaque, loop: *Loop) callconv(.C) void)) void;
extern fn uws_loop_removePostHandler(loop: *Loop, ctx: *anyopaque, cb: (fn (ctx: *anyopaque, loop: *Loop) callconv(.C) void)) void;
+ extern fn uws_loop_addPreHandler(loop: *Loop, ctx: *anyopaque, cb: (fn (ctx: *anyopaque, loop: *Loop) callconv(.C) void)) void;
+ extern fn uws_loop_removePreHandler(loop: *Loop, ctx: *anyopaque, cb: (fn (ctx: *anyopaque, loop: *Loop) callconv(.C) void)) void;
};
const uintmax_t = c_ulong;
@@ -372,9 +430,8 @@ pub const Poll = opaque {
val: Data,
fallthrough: bool,
flags: Flags,
- callback: CallbackType,
) ?*Poll {
- var poll = us_create_callback(loop, @as(c_int, @boolToInt(fallthrough)), file, @sizeOf(Data));
+ var poll = us_create_poll(loop, @as(c_int, @boolToInt(fallthrough)), @sizeOf(Data));
if (comptime Data != void) {
poll.data(Data).* = val;
}
@@ -386,8 +443,7 @@ pub const Poll = opaque {
if (flags.write) {
flags_int |= Flags.write_flag;
}
-
- us_callback_set(poll, flags_int, callback);
+ us_poll_init(poll, file, flags_int);
return poll;
}
@@ -403,10 +459,17 @@ pub const Poll = opaque {
return @intCast(@import("std").os.fd_t, us_poll_fd(self));
}
- pub fn start(self: *Poll, poll_type: Flags) void {
- // us_poll_start(self, loop: ?*Loop, events: c_int)
- _ = self;
- _ = poll_type;
+ pub fn start(self: *Poll, loop: *Loop, flags: Flags) void {
+ var flags_int: c_int = 0;
+ if (flags.read) {
+ flags_int |= Flags.read_flag;
+ }
+
+ if (flags.write) {
+ flags_int |= Flags.write_flag;
+ }
+
+ us_poll_start(self, loop, flags_int);
}
pub const Flags = struct {
@@ -424,9 +487,9 @@ pub const Poll = opaque {
}
// (void* userData, int fd, int events, int error, struct us_poll_t *poll)
- pub const CallbackType = fn (?*anyopaque, c_int, c_int, c_int, *Poll) void;
- extern fn us_create_callback(loop: ?*Loop, fallthrough: c_int, fd: c_int, ext_size: c_uint) *Poll;
- extern fn us_callback_set(poll: *Poll, events: c_int, callback: CallbackType) *Poll;
+ pub const CallbackType = fn (?*anyopaque, c_int, c_int, c_int, *Poll) callconv(.C) void;
+ extern fn us_create_poll(loop: ?*Loop, fallthrough: c_int, ext_size: c_uint) *Poll;
+ extern fn us_poll_set(poll: *Poll, events: c_int, callback: CallbackType) *Poll;
extern fn us_poll_free(p: ?*Poll, loop: ?*Loop) void;
extern fn us_poll_init(p: ?*Poll, fd: c_int, poll_type: c_int) void;
extern fn us_poll_start(p: ?*Poll, loop: ?*Loop, events: c_int) void;
diff --git a/src/feature_flags.zig b/src/feature_flags.zig
index 692ee6c1d..c3a9fea10 100644
--- a/src/feature_flags.zig
+++ b/src/feature_flags.zig
@@ -70,6 +70,7 @@ pub const verbose_analytics = false;
pub const disable_compression_in_http_client = false;
+pub const enable_keepalive = true;
// Not sure why...
// But this is slower!
// ~/Build/throw
diff --git a/src/global.zig b/src/global.zig
index 4db835be3..313a5a2ab 100644
--- a/src/global.zig
+++ b/src/global.zig
@@ -326,3 +326,5 @@ pub fn rand(bytes: []u8) void {
const BoringSSL = @import("boringssl");
_ = BoringSSL.RAND_bytes(bytes.ptr, bytes.len);
}
+
+pub const ObjectPool = @import("./pool.zig").ObjectPool;
diff --git a/src/http/websocket_http_client.zig b/src/http/websocket_http_client.zig
index 6c9cc6004..64ee3f778 100644
--- a/src/http/websocket_http_client.zig
+++ b/src/http/websocket_http_client.zig
@@ -134,7 +134,7 @@ pub fn NewHTTPUpgradeClient(comptime ssl: bool) type {
pub fn register(global: *JSC.JSGlobalObject, loop_: *anyopaque, ctx_: *anyopaque) callconv(.C) void {
var vm = global.bunVM();
- var loop = @ptrCast(*uws.Loop, loop_);
+ var loop = @ptrCast(*uws.Loop, @alignCast(@alignOf(uws.Loop), loop_));
var ctx: *uws.us_socket_context_t = @ptrCast(*uws.us_socket_context_t, ctx_);
if (vm.uws_event_loop) |other| {
@@ -762,7 +762,8 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
pub fn register(global: *JSC.JSGlobalObject, loop_: *anyopaque, ctx_: *anyopaque) callconv(.C) void {
var vm = global.bunVM();
- var loop = @ptrCast(*uws.Loop, loop_);
+ var loop = @ptrCast(*uws.Loop, @alignCast(@alignOf(uws.Loop), loop_));
+
var ctx: *uws.us_socket_context_t = @ptrCast(*uws.us_socket_context_t, ctx_);
if (vm.uws_event_loop) |other| {
@@ -1416,7 +1417,8 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
adopted.receive_buffer.ensureTotalCapacity(2048) catch return null;
adopted.event_loop_ref = true;
adopted.globalThis.bunVM().us_loop_reference_count +|= 1;
- _ = globalThis.bunVM().eventLoop().ready_tasks_count.fetchAdd(1, .Monotonic);
+ globalThis.bunVM().active_tasks += 1;
+
var buffered_slice: []u8 = buffered_data[0..buffered_data_len];
if (buffered_slice.len > 0) {
const InitialDataHandler = struct {
@@ -1455,7 +1457,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
if (this.event_loop_ref) {
this.event_loop_ref = false;
this.globalThis.bunVM().us_loop_reference_count -|= 1;
- _ = this.globalThis.bunVM().eventLoop().ready_tasks_count.fetchSub(1, .Monotonic);
+ this.globalThis.bunVM().active_tasks -|= 1;
}
this.outgoing_websocket = null;
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 869ff8724..321b0efb2 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -39,6 +39,11 @@ pub var http_thread: HTTPThread = undefined;
const HiveArray = @import("./hive_array.zig").HiveArray;
const Batch = NetworkThread.Batch;
const TaggedPointerUnion = @import("./tagged_pointer.zig").TaggedPointerUnion;
+const DeadSocket = opaque {};
+var dead_socket = @intToPtr(*DeadSocket, 1);
+
+const print_every = 0;
+var print_every_i: usize = 0;
fn NewHTTPContext(comptime ssl: bool) type {
return struct {
@@ -48,28 +53,24 @@ fn NewHTTPContext(comptime ssl: bool) type {
hostname_buf: [MAX_KEEPALIVE_HOSTNAME]u8 = undefined,
hostname_len: u8 = 0,
port: u16 = 0,
-
- pub fn close(this: *PooledSocket) void {
- this.* = undefined;
-
- if (comptime ssl) {
- http_thread.https_context.keep_alive_sockets.unset(http_thread.https_context.pending_sockets.indexOf(this).?);
- std.debug.assert(http_thread.https_context.pending_sockets.put(this));
- } else {
- http_thread.http_context.keep_alive_sockets.unset(http_thread.http_context.pending_sockets.indexOf(this).?);
- std.debug.assert(http_thread.http_context.pending_sockets.put(this));
- }
- }
};
pending_sockets: HiveArray(PooledSocket, pool_size) = HiveArray(PooledSocket, pool_size).init(),
- keep_alive_sockets: std.bit_set.IntegerBitSet(pool_size + 1) = std.bit_set.IntegerBitSet(pool_size + 1).initEmpty(),
us_socket_context: *uws.us_socket_context_t,
const Context = @This();
pub const HTTPSocket = uws.NewSocketHandler(ssl);
+ pub fn context() *@This() {
+ if (comptime ssl) {
+ return &http_thread.https_context;
+ } else {
+ return &http_thread.http_context;
+ }
+ }
+
const ActiveSocket = TaggedPointerUnion(.{
+ DeadSocket,
HTTPClient,
PooledSocket,
});
@@ -80,7 +81,7 @@ fn NewHTTPContext(comptime ssl: bool) type {
pub fn init(this: *@This()) !void {
var opts: uws.us_socket_context_options_t = undefined;
@memset(@ptrCast([*]u8, &opts), 0, @sizeOf(uws.us_socket_context_options_t));
- this.us_socket_context = uws.us_create_socket_context(ssl_int, uws.Loop.get(), @sizeOf(usize), opts).?;
+ this.us_socket_context = uws.us_create_socket_context(ssl_int, http_thread.loop, @sizeOf(usize), opts).?;
HTTPSocket.configure(
this.us_socket_context,
@@ -97,22 +98,26 @@ fn NewHTTPContext(comptime ssl: bool) type {
std.debug.assert(!socket.isShutdown());
std.debug.assert(socket.isEstablished());
}
+ std.debug.assert(hostname.len > 0);
+ std.debug.assert(port > 0);
- if (hostname.len <= MAX_KEEPALIVE_HOSTNAME) {
+ if (hostname.len <= MAX_KEEPALIVE_HOSTNAME and !socket.isClosed() and !socket.isShutdown() and socket.isEstablished()) {
if (this.pending_sockets.get()) |pending| {
+ socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(pending).ptr());
+ socket.flush();
+ socket.timeout(60);
+
pending.http_socket = socket;
@memcpy(&pending.hostname_buf, hostname.ptr, hostname.len);
pending.hostname_len = @truncate(u8, hostname.len);
pending.port = port;
- this.keep_alive_sockets.set(
- this.pending_sockets.indexOf(pending).?,
- );
- pending.http_socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(pending).ptr());
- log("Releasing socket for reuse {s}:{d}", .{ hostname, port });
+
+ log("- Keep-Alive release {s}:{d}", .{ hostname, port });
return;
}
}
+ socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(&dead_socket).ptr());
socket.close(0, null);
}
@@ -121,9 +126,20 @@ fn NewHTTPContext(comptime ssl: bool) type {
ptr: *anyopaque,
socket: HTTPSocket,
) void {
- if (ActiveSocket.from(bun.cast(**anyopaque, ptr).*).get(HTTPClient)) |client| {
+ const active = ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
+ if (active.get(HTTPClient)) |client| {
return client.onOpen(comptime ssl, socket);
}
+
+ if (active.get(PooledSocket)) |pooled| {
+ std.debug.assert(context().pending_sockets.put(pooled));
+ }
+
+ socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(&dead_socket).ptr());
+ socket.close(0, null);
+ if (comptime Environment.allow_assert) {
+ std.debug.assert(false);
+ }
}
pub fn onClose(
ptr: *anyopaque,
@@ -132,15 +148,17 @@ fn NewHTTPContext(comptime ssl: bool) type {
_: ?*anyopaque,
) void {
var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
+ socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(&dead_socket).ptr());
+
if (tagged.get(HTTPClient)) |client| {
return client.onClose(comptime ssl, socket);
}
- if (tagged.get(PooledSocket)) |client| {
- return client.close();
+ if (tagged.get(PooledSocket)) |pooled| {
+ std.debug.assert(context().pending_sockets.put(pooled));
}
- unreachable;
+ return;
}
pub fn onData(
ptr: *anyopaque,
@@ -155,9 +173,9 @@ fn NewHTTPContext(comptime ssl: bool) type {
if (comptime ssl) &http_thread.https_context else &http_thread.http_context,
socket,
);
+ } else {
+ log("Unexpected data on socket", .{});
}
-
- unreachable;
}
pub fn onWritable(
ptr: *anyopaque,
@@ -171,24 +189,26 @@ fn NewHTTPContext(comptime ssl: bool) type {
socket,
);
}
-
- unreachable;
}
pub fn onTimeout(
ptr: *anyopaque,
socket: HTTPSocket,
) void {
var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
+ socket.ext(**anyopaque).?.* = bun.cast(
+ **anyopaque,
+ ActiveSocket.init(&dead_socket).ptr(),
+ );
+
if (tagged.get(HTTPClient)) |client| {
return client.onTimeout(
comptime ssl,
socket,
);
} else if (tagged.get(PooledSocket)) |pooled| {
- pooled.close();
+ std.debug.assert(context().pending_sockets.put(pooled));
+ return;
}
-
- unreachable;
}
pub fn onConnectError(
ptr: *anyopaque,
@@ -202,7 +222,8 @@ fn NewHTTPContext(comptime ssl: bool) type {
socket,
);
} else if (tagged.get(PooledSocket)) |pooled| {
- pooled.close();
+ std.debug.assert(context().pending_sockets.put(pooled));
+ return;
}
unreachable;
@@ -212,13 +233,17 @@ fn NewHTTPContext(comptime ssl: bool) type {
socket: HTTPSocket,
) void {
var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
+ socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(dead_socket).ptr());
+
if (tagged.get(HTTPClient)) |client| {
return client.onEnd(
comptime ssl,
socket,
);
} else if (tagged.get(PooledSocket)) |pooled| {
- pooled.close();
+ std.debug.assert(context().pending_sockets.put(pooled));
+
+ return;
}
unreachable;
@@ -229,22 +254,30 @@ fn NewHTTPContext(comptime ssl: bool) type {
if (hostname.len > MAX_KEEPALIVE_HOSTNAME)
return null;
- var iter = this.keep_alive_sockets.iterator(.{
- .kind = .set,
- });
- while (iter.next()) |index_i| {
- const index = @truncate(u16, index_i);
- var socket = this.pending_sockets.at(index);
+ var iter = this.pending_sockets.available.iterator(.{ .kind = .unset });
+
+ while (iter.next()) |pending_socket_index| {
+ var socket = this.pending_sockets.at(@intCast(u16, pending_socket_index));
if (socket.port != port) {
continue;
}
- std.debug.assert(!this.pending_sockets.available.isSet(index));
-
if (strings.eqlLong(socket.hostname_buf[0..socket.hostname_len], hostname, true)) {
const http_socket = socket.http_socket;
- socket.close();
- log("Keep-alive socket found for {s}:{d}.", .{ hostname, port });
+ std.debug.assert(context().pending_sockets.put(socket));
+
+ if (http_socket.isClosed()) {
+ http_socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(&dead_socket).ptr());
+ continue;
+ }
+
+ if (http_socket.isShutdown()) {
+ http_socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(&dead_socket).ptr());
+ http_socket.close(0, null);
+ continue;
+ }
+
+ log("+ Keep-Alive reuse {s}:{d}", .{ hostname, port });
return http_socket;
}
}
@@ -252,16 +285,22 @@ fn NewHTTPContext(comptime ssl: bool) type {
return null;
}
- pub fn connect(this: *@This(), client: *HTTPClient, hostname_: []const u8, port: u16) !HTTPSocket {
- const hostname = if (FeatureFlags.hardcode_localhost_to_127_0_0_1 and strings.eqlComptime(hostname_, "localhost"))
- "127.0.0.1"
- else
- hostname_;
-
- if (this.existingSocket(hostname, port)) |sock| {
- sock.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(client).ptr());
- client.onOpen(comptime ssl, sock);
- return sock;
+ pub fn connect(this: *@This(), client: *HTTPClient, hostname: []const u8, port: u16) !HTTPSocket {
+ // const hostname = if (FeatureFlags.hardcode_localhost_to_127_0_0_1 and strings.eqlComptime(hostname_, "localhost"))
+ // "127.0.0.1"
+ // else
+ // hostname_;
+
+ client.connected_url = client.url;
+ client.connected_url.hostname = hostname;
+
+ if (comptime FeatureFlags.enable_keepalive) {
+ if (this.existingSocket(hostname, port)) |sock| {
+ sock.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(client).ptr());
+ client.allow_retry = true;
+ client.onOpen(comptime ssl, sock);
+ return sock;
+ }
}
if (HTTPSocket.connectAnon(
@@ -270,6 +309,7 @@ fn NewHTTPContext(comptime ssl: bool) type {
this.us_socket_context,
undefined,
)) |socket| {
+ client.allow_retry = false;
socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(client).ptr());
return socket;
}
@@ -279,6 +319,9 @@ fn NewHTTPContext(comptime ssl: bool) type {
};
}
+const UnboundedQueue = @import("./bun.js/unbounded_queue.zig").UnboundedQueue;
+const Queue = UnboundedQueue(AsyncHTTP, .next);
+
pub const HTTPThread = struct {
var http_thread_loaded: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false);
@@ -286,9 +329,7 @@ pub const HTTPThread = struct {
http_context: NewHTTPContext(false),
https_context: NewHTTPContext(true),
- queued_tasks_mutex: Lock = Lock.init(),
- queued_tasks: Batch = .{},
- processing_tasks: Batch = .{},
+ queued_tasks: Queue = Queue{},
has_awoken: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
timer: std.time.Timer = undefined,
const threadlog = Output.scoped(.HTTPThread, true);
@@ -319,8 +360,15 @@ pub const HTTPThread = struct {
Output.Source.configureNamedThread("HTTP Client");
default_arena = Arena.init() catch unreachable;
default_allocator = default_arena.allocator();
- var loop = uws.Loop.get().?;
- _ = loop.addPostHandler(*HTTPThread, &http_thread, drainEvents);
+ var loop = uws.Loop.create(struct {
+ pub fn wakeup(_: *uws.Loop) callconv(.C) void {
+ http_thread.drainEvents();
+ }
+
+ pub fn pre(_: *uws.Loop) callconv(.C) void {}
+ pub fn post(_: *uws.Loop) callconv(.C) void {}
+ });
+
http_thread.loop = loop;
http_thread.http_context.init() catch @panic("Failed to init http context");
http_thread.https_context.init() catch @panic("Failed to init https context");
@@ -328,16 +376,6 @@ pub const HTTPThread = struct {
http_thread.processEvents();
}
- fn queueEvents(this: *@This()) void {
- this.queued_tasks_mutex.lock();
- defer this.queued_tasks_mutex.unlock();
- if (this.queued_tasks.len == 0)
- return;
- threadlog("Received {d} tasks\n", .{this.queued_tasks.len});
- this.processing_tasks.push(this.queued_tasks);
- this.queued_tasks = .{};
- }
-
pub fn connect(this: *@This(), client: *HTTPClient, comptime is_ssl: bool) !NewHTTPContext(is_ssl).HTTPSocket {
return try this.context(is_ssl).connect(client, client.url.hostname, client.url.getPortAuto());
}
@@ -347,25 +385,33 @@ pub const HTTPThread = struct {
}
fn drainEvents(this: *@This()) void {
- this.queueEvents();
-
var count: usize = 0;
+ var remaining: usize = AsyncHTTP.max_simultaneous_requests - AsyncHTTP.active_requests_count.loadUnchecked();
+ if (remaining == 0) return;
+ defer {
+ if (comptime Environment.allow_assert) {
+ if (count > 0)
+ log("Processed {d} tasks\n", .{count});
+ }
+ }
- while (this.processing_tasks.pop()) |task| {
- var callback = task.callback;
- callback(task);
+ while (this.queued_tasks.pop()) |http| {
+ var cloned = default_allocator.create(AsyncHTTP) catch unreachable;
+ cloned.* = http.*;
+ cloned.real = http;
+ cloned.onStart();
if (comptime Environment.allow_assert) {
count += 1;
}
- }
- if (comptime Environment.allow_assert) {
- if (count > 0)
- log("Processed {d} tasks\n", .{count});
+ remaining -= 1;
+ if (remaining == 0) break;
}
}
fn processEvents_(this: *@This()) void {
+ this.loop.num_polls = @maximum(2, this.loop.num_polls);
+
while (true) {
this.drainEvents();
@@ -392,9 +438,11 @@ pub const HTTPThread = struct {
return;
{
- this.queued_tasks_mutex.lock();
- defer this.queued_tasks_mutex.unlock();
- this.queued_tasks.push(batch);
+ var batch_ = batch;
+ while (batch_.pop()) |task| {
+ var http: *AsyncHTTP = @fieldParentPtr(AsyncHTTP, "task", task);
+ this.queued_tasks.push(http);
+ }
}
if (this.has_awoken.load(.Monotonic))
@@ -439,6 +487,12 @@ pub fn onClose(
_ = socket;
log("Closed {s}\n", .{client.url.href});
+ if (client.allow_retry) {
+ client.allow_retry = false;
+ client.start(client.state.request_body, client.state.body_out_str.?);
+ return;
+ }
+
if (client.state.stage != .done and client.state.stage != .fail)
client.fail(error.ConnectionClosed);
}
@@ -554,12 +608,9 @@ pub const InternalState = struct {
this.compressed_body = null;
}
- if (this.body_out_str) |body| {
- body.reset();
- }
-
+ var body_msg = this.body_out_str;
this.* = .{
- .body_out_str = this.body_out_str,
+ .body_out_str = body_msg,
};
}
@@ -632,6 +683,7 @@ connected_url: URL = URL{},
allocator: std.mem.Allocator,
verbose: bool = Environment.isTest,
remaining_redirect_count: i8 = default_redirect_count,
+allow_retry: bool = false,
redirect: ?*URLBufferPool.Node = null,
timeout: usize = 0,
progress_node: ?*std.Progress.Node = null,
@@ -664,13 +716,11 @@ pub fn init(
};
}
-pub fn deinit(this: *HTTPClient) !void {
+pub fn deinit(this: *HTTPClient) void {
if (this.redirect) |redirect| {
redirect.release();
this.redirect = null;
}
-
- this.state.reset();
}
const Stage = enum(u8) {
@@ -772,74 +822,20 @@ pub const HTTPChannelContext = struct {
}
};
-// This causes segfaults when resume connect()
-pub const KeepAlive = struct {
- const limit = 2;
- pub const disabled = true;
- fds: [limit]u32 = undefined,
- hosts: [limit]u64 = undefined,
- ports: [limit]u16 = undefined,
- used: u8 = 0,
-
- pub var instance = KeepAlive{};
-
- pub fn append(this: *KeepAlive, host: []const u8, port: u16, fd: os.socket_t) bool {
- if (disabled) return false;
- if (this.used >= limit or fd > std.math.maxInt(u32)) return false;
-
- const i = this.used;
- const hash = std.hash.Wyhash.hash(0, host);
-
- this.fds[i] = @truncate(u32, @intCast(u64, fd));
- this.hosts[i] = hash;
- this.ports[i] = port;
- this.used += 1;
- return true;
- }
- pub fn find(this: *KeepAlive, host: []const u8, port: u16) ?os.socket_t {
- if (disabled) return null;
-
- if (this.used == 0) {
- return null;
- }
-
- const hash = std.hash.Wyhash.hash(0, host);
- const list = this.hosts[0..this.used];
- for (list) |host_hash, i| {
- if (host_hash == hash and this.ports[i] == port) {
- const fd = this.fds[i];
- const last = this.used - 1;
-
- if (i > last) {
- const end_host = this.hosts[last];
- const end_fd = this.fds[last];
- const end_port = this.ports[last];
- this.hosts[i] = end_host;
- this.fds[i] = end_fd;
- this.ports[i] = end_port;
- }
- this.used -= 1;
-
- return @intCast(os.socket_t, fd);
- }
- }
-
- return null;
- }
-};
-
pub const AsyncHTTP = struct {
request: ?picohttp.Request = null,
response: ?picohttp.Response = null,
request_headers: Headers.Entries = Headers.Entries{},
response_headers: Headers.Entries = Headers.Entries{},
response_buffer: *MutableString,
- request_body: *MutableString,
+ request_body: []const u8 = "",
allocator: std.mem.Allocator,
request_header_buf: string = "",
method: Method = Method.GET,
max_retry_count: u32 = 0,
url: URL,
+ real: ?*AsyncHTTP = null,
+ next: ?*AsyncHTTP = null,
task: ThreadPool.Task = ThreadPool.Task{ .callback = startAsyncHTTP },
completion_callback: HTTPClientResult.Callback = undefined,
@@ -859,8 +855,15 @@ pub const AsyncHTTP = struct {
elapsed: u64 = 0,
gzip_elapsed: u64 = 0,
- pub var active_requests_count = std.atomic.Atomic(u32).init(0);
- pub var max_simultaneous_requests: u16 = 32;
+ pub var active_requests_count = std.atomic.Atomic(usize).init(0);
+ pub var max_simultaneous_requests: usize = 256;
+
+ pub fn deinit(this: *AsyncHTTP) void {
+ this.response_headers.deinit(this.allocator);
+ this.response_headers = .{};
+ this.request = null;
+ this.response = null;
+ }
pub const State = enum(u32) {
pending = 0,
@@ -878,7 +881,7 @@ pub const AsyncHTTP = struct {
headers: Headers.Entries,
headers_buf: string,
response_buffer: *MutableString,
- request_body: *MutableString,
+ request_body: []const u8,
timeout: usize,
callback: HTTPClientResult.Callback,
) AsyncHTTP {
@@ -905,7 +908,7 @@ pub const AsyncHTTP = struct {
headers: Headers.Entries,
headers_buf: string,
response_buffer: *MutableString,
- request_body: *MutableString,
+ request_body: []const u8,
timeout: usize,
) AsyncHTTP {
return @This().init(
@@ -929,7 +932,6 @@ pub const AsyncHTTP = struct {
}
pub fn schedule(this: *AsyncHTTP, _: std.mem.Allocator, batch: *ThreadPool.Batch) void {
- HTTPThread.init() catch unreachable;
this.state.store(.scheduled, .Monotonic);
batch.push(ThreadPool.Batch.from(&this.task));
}
@@ -953,7 +955,7 @@ pub const AsyncHTTP = struct {
http_thread.schedule(batch);
while (true) {
const result: HTTPClientResult = ctx.channel.readItem() catch unreachable;
- if (result.fail != error.NoError) {
+ if (!result.isSuccess()) {
return result.fail;
}
@@ -964,23 +966,44 @@ pub const AsyncHTTP = struct {
}
pub fn onAsyncHTTPComplete(this: *AsyncHTTP, result: HTTPClientResult) void {
+ std.debug.assert(this.real != null);
+ const active_requests = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic);
+ std.debug.assert(active_requests > 0);
+
var completion = this.completion_callback;
this.response = result.response;
this.elapsed = http_thread.timer.read() -| this.elapsed;
this.redirected = this.client.remaining_redirect_count != default_redirect_count;
- if (result.fail != error.NoError) {
+ if (!result.isSuccess()) {
this.err = result.fail;
this.state.store(State.fail, .Monotonic);
} else {
this.err = null;
this.state.store(.success, .Monotonic);
}
+ this.client.deinit();
+
+ this.real.?.* = this.*;
+ this.real.?.response_buffer = this.response_buffer;
+
+ log("onAsyncHTTPComplete: {any}", .{bun.fmt.fmtDuration(this.elapsed)});
+
+ default_allocator.destroy(this);
completion.function(completion.ctx, result);
+
+ if (active_requests == AsyncHTTP.max_simultaneous_requests) {
+ http_thread.drainEvents();
+ }
}
pub fn startAsyncHTTP(task: *Task) void {
var this = @fieldParentPtr(AsyncHTTP, "task", task);
+ this.onStart();
+ }
+
+ pub fn onStart(this: *AsyncHTTP) void {
+ _ = active_requests_count.fetchAdd(1, .Monotonic);
this.err = null;
this.state.store(.sending, .Monotonic);
this.client.completion_callback = HTTPClientResult.Callback.New(*AsyncHTTP, onAsyncHTTPComplete).init(
@@ -988,8 +1011,12 @@ pub const AsyncHTTP = struct {
);
this.elapsed = http_thread.timer.read();
+ if (this.response_buffer.list.capacity == 0) {
+ this.response_buffer.allocator = default_allocator;
+ }
+ this.client.start(this.request_body, this.response_buffer);
- this.client.start(this.request_body.list.items, this.response_buffer);
+ log("onStart: {any}", .{bun.fmt.fmtDuration(this.elapsed)});
}
};
@@ -1130,7 +1157,6 @@ pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString)
}
fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
- this.connected_url = this.url;
var socket = http_thread.connect(this, is_ssl) catch |err| {
this.fail(err);
return;
@@ -1161,8 +1187,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
writer,
request,
) catch {
- this.fail(error.OutOfMemory);
- socket.close(0, null);
+ this.closeAndFail(error.OutOfMemory, is_ssl, socket);
return;
};
@@ -1187,8 +1212,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
}
if (amount < 0) {
- this.fail(error.WriteFailed);
- socket.close(0, null);
+ this.closeAndFail(error.WriteFailed, is_ssl, socket);
return;
}
@@ -1219,8 +1243,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
const to_send = this.state.request_body;
const amount = socket.write(to_send, true);
if (amount < 0) {
- this.fail(error.WriteFailed);
- socket.close(0, null);
+ this.closeAndFail(error.WriteFailed, is_ssl, socket);
return;
}
@@ -1236,6 +1259,15 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
}
}
+pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
+ socket.ext(**anyopaque).?.* = bun.cast(
+ **anyopaque,
+ NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(),
+ );
+ this.fail(err);
+ socket.close(0, null);
+}
+
pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void {
switch (this.state.response_stage) {
.pending, .headers => {
@@ -1244,28 +1276,32 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
var amount_read: usize = 0;
var needs_move = true;
if (this.state.request_message) |req_msg| {
- var available = req_msg.available();
+ var available = req_msg.buf;
if (available.len == 0) {
this.state.request_message.?.release();
this.state.request_message = null;
this.fail(error.ResponseHeadersTooLarge);
+ socket.shutdown();
socket.close(0, null);
return;
}
+ const wrote = @minimum(available.len - req_msg.used, incoming_data.len);
@memcpy(
- req_msg.available().ptr,
+ available.ptr + req_msg.used,
incoming_data.ptr,
- @minimum(available.len, incoming_data.len),
+ wrote,
);
- req_msg.used += @truncate(u32, incoming_data.len);
- amount_read = @truncate(u32, req_msg.sent);
+ req_msg.used += @truncate(u32, wrote);
+ amount_read = 0;
req_msg.sent = 0;
needs_move = false;
- to_read = req_msg.slice();
- pending_buffers[1] = incoming_data[@minimum(available.len, incoming_data.len)..];
+ to_read = available[0..req_msg.used];
+ pending_buffers[1] = incoming_data[wrote..];
}
+ this.state.pending_response = picohttp.Response{};
+
const response = picohttp.Response.parseParts(
to_read,
&this.response_headers_buf,
@@ -1273,29 +1309,28 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
) catch |err| {
switch (err) {
error.ShortRead => {
- socket.timeout(60);
if (needs_move) {
std.debug.assert(this.state.request_message == null);
this.state.request_message = AsyncMessage.get(default_allocator);
if (to_read.len > this.state.request_message.?.buf.len) {
- this.fail(error.ResponseHeadersTooLarge);
- socket.close(0, null);
+ this.closeAndFail(error.ResponseHeadersTooLarge, is_ssl, socket);
return;
}
_ = this.state.request_message.?.writeAll(incoming_data);
- this.state.request_message.?.sent = @truncate(u32, to_read.len);
- return;
+ this.state.request_message.?.sent = @truncate(u32, amount_read);
}
+
+ socket.timeout(60);
},
else => {
- socket.close(0, null);
- this.fail(err);
+ this.closeAndFail(err, is_ssl, socket);
return;
},
}
- unreachable;
+ return;
};
+
pending_buffers[0] = to_read[@minimum(@intCast(usize, response.bytes_read), to_read.len)..];
if (pending_buffers[0].len == 0 and pending_buffers[1].len > 0) {
pending_buffers[0] = pending_buffers[1];
@@ -1318,9 +1353,13 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
this.state.request_message = null;
}
- if (this.state.allow_keepalive) {
+ if (this.state.allow_keepalive and FeatureFlags.enable_keepalive) {
std.debug.assert(this.connected_url.hostname.len > 0);
- ctx.releaseSocket(socket, this.connected_url.hostname, this.connected_url.getPortAuto());
+ ctx.releaseSocket(
+ socket,
+ this.connected_url.hostname,
+ this.connected_url.getPortAuto(),
+ );
} else {
socket.close(0, null);
}
@@ -1335,8 +1374,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
return;
}
- socket.close(0, null);
- this.fail(err);
+ this.closeAndFail(err, is_ssl, socket);
return;
};
@@ -1352,8 +1390,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
if (this.state.response_stage == .body) {
{
const is_done = this.handleResponseBody(pending_buffers[0]) catch |err| {
- socket.close(0, null);
- this.fail(err);
+ this.closeAndFail(err, is_ssl, socket);
return;
};
@@ -1365,8 +1402,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
if (pending_buffers[1].len > 0) {
const is_done = this.handleResponseBody(pending_buffers[1]) catch |err| {
- socket.close(0, null);
- this.fail(err);
+ this.closeAndFail(err, is_ssl, socket);
return;
};
@@ -1378,8 +1414,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
} else if (this.state.response_stage == .body_chunk) {
{
const is_done = this.handleResponseBodyChunk(pending_buffers[0]) catch |err| {
- socket.close(0, null);
- this.fail(err);
+ this.closeAndFail(err, is_ssl, socket);
return;
};
@@ -1391,8 +1426,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
if (pending_buffers[1].len > 0) {
const is_done = this.handleResponseBodyChunk(pending_buffers[1]) catch |err| {
- socket.close(0, null);
- this.fail(err);
+ this.closeAndFail(err, is_ssl, socket);
return;
};
@@ -1410,8 +1444,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
socket.timeout(60);
const is_done = this.handleResponseBody(incoming_data) catch |err| {
- socket.close(0, null);
- this.fail(err);
+ this.closeAndFail(err, is_ssl, socket);
return;
};
@@ -1425,8 +1458,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
socket.timeout(60);
const is_done = this.handleResponseBodyChunk(incoming_data) catch |err| {
- socket.close(0, null);
- this.fail(err);
+ this.closeAndFail(err, is_ssl, socket);
return;
};
@@ -1439,8 +1471,8 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
.fail => {},
else => {
- socket.close(0, null);
- this.fail(error.UnexpectedData);
+ this.state.pending_response = .{};
+ this.closeAndFail(error.UnexpectedData, is_ssl, socket);
return;
},
}
@@ -1468,9 +1500,14 @@ pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ss
this.state.request_stage = .done;
this.state.stage = .done;
- if (this.state.allow_keepalive and !socket.isClosed()) {
- socket.timeout(60 * 5);
- ctx.releaseSocket(socket, this.connected_url.hostname, this.connected_url.getPortAuto());
+ socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr());
+
+ if (this.state.allow_keepalive and !socket.isClosed() and FeatureFlags.enable_keepalive) {
+ ctx.releaseSocket(
+ socket,
+ this.connected_url.hostname,
+ this.connected_url.getPortAuto(),
+ );
} else if (!socket.isClosed()) {
socket.close(0, null);
}
@@ -1481,21 +1518,35 @@ pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ss
this.state.response_stage = .done;
this.state.request_stage = .done;
this.state.stage = .done;
-
+ if (comptime print_every > 0) {
+ print_every_i += 1;
+ if (print_every_i % print_every == 0) {
+ Output.prettyln("Heap stats for HTTP thread\n", .{});
+ Output.flush();
+ default_arena.dumpThreadStats();
+ print_every_i = 0;
+ }
+ }
callback.run(result);
}
pub const HTTPClientResult = struct {
body: ?*MutableString = null,
- response: picohttp.Response,
+ response: picohttp.Response = .{},
metadata_buf: []u8 = &.{},
href: []const u8 = "",
fail: anyerror = error.NoError,
+ redirected: bool = false,
headers_buf: []picohttp.Header = &.{},
+ pub fn isSuccess(this: *const HTTPClientResult) bool {
+ return this.fail == error.NoError;
+ }
+
pub fn deinitMetadata(this: *HTTPClientResult) void {
if (this.metadata_buf.len > 0) bun.default_allocator.free(this.metadata_buf);
if (this.headers_buf.len > 0) bun.default_allocator.free(this.headers_buf);
+
this.headers_buf = &.{};
this.metadata_buf = &.{};
this.href = "";
@@ -1545,15 +1596,30 @@ pub fn toResult(this: *HTTPClient) HTTPClientResult {
.body = this.state.body_out_str,
.response = response,
.metadata_buf = builder.ptr.?[0..builder.cap],
+ .redirected = this.remaining_redirect_count != default_redirect_count,
.href = href,
.fail = this.state.fail,
.headers_buf = headers_buf,
};
}
+// preallocate a buffer for the body no more than 256 MB
+// the intent is to avoid an OOM caused by a malicious server
+// reporting gigantic Conten-Length and then
+// never finishing sending the body
+const preallocate_max = 1024 * 1024 * 256;
+
pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8) !bool {
var buffer = this.state.getBodyBuffer();
+ if (buffer.list.items.len == 0 and
+ this.state.body_size > 0 and this.state.body_size < preallocate_max)
+ {
+ // since we don't do streaming yet, we might as well just allocate the whole thing
+ // when we know the expected size
+ buffer.list.ensureTotalCapacityPrecise(buffer.allocator, this.state.body_size) catch {};
+ }
+
const remaining_content_length = this.state.body_size - buffer.list.items.len;
var remainder = incoming_data[0..@minimum(incoming_data.len, remaining_content_length)];
@@ -1683,7 +1749,7 @@ pub fn handleResponseMetadata(
location = header.value;
},
hashHeaderName("Connection") => {
- if (response.status_code >= 200 and response.status_code <= 299 and !KeepAlive.disabled) {
+ if (response.status_code >= 200 and response.status_code <= 299) {
if (!strings.eqlComptime(header.value, "keep-alive")) {
this.state.allow_keepalive = false;
}
diff --git a/src/install/install.zig b/src/install/install.zig
index f0929cd3d..93b3def58 100644
--- a/src/install/install.zig
+++ b/src/install/install.zig
@@ -317,7 +317,6 @@ const NetworkTask = struct {
header_builder.content = GlobalStringBuilder{ .ptr = @intToPtr([*]u8, @ptrToInt(std.mem.span(default_headers_buf).ptr)), .len = default_headers_buf.len, .cap = default_headers_buf.len };
}
- this.request_buffer = try MutableString.init(allocator, 0);
this.response_buffer = try MutableString.init(allocator, 0);
this.allocator = allocator;
this.http = AsyncHTTP.init(
@@ -327,7 +326,7 @@ const NetworkTask = struct {
header_builder.entries,
header_builder.content.ptr.?[0..header_builder.content.len],
&this.response_buffer,
- &this.request_buffer,
+ "",
0,
this.getCompletionCallback(),
);
@@ -376,7 +375,6 @@ const NetworkTask = struct {
this.url_buf = tarball.url;
}
- this.request_buffer = try MutableString.init(allocator, 0);
this.response_buffer = try MutableString.init(allocator, 0);
this.allocator = allocator;
@@ -410,7 +408,7 @@ const NetworkTask = struct {
header_builder.entries,
header_buf,
&this.response_buffer,
- &this.request_buffer,
+ "",
0,
this.getCompletionCallback(),
);
diff --git a/src/mimalloc_arena.zig b/src/mimalloc_arena.zig
index c8117b6f5..0f54cdea6 100644
--- a/src/mimalloc_arena.zig
+++ b/src/mimalloc_arena.zig
@@ -31,9 +31,14 @@ pub const Arena = struct {
pub fn deinit(this: *Arena) void {
mimalloc.mi_heap_destroy(this.heap);
+
this.heap = null;
}
+ pub fn dumpThreadStats(_: *Arena) void {
+ mimalloc.mi_thread_stats_print_out(null, null);
+ }
+
pub fn reset(this: *Arena) void {
this.deinit();
this.* = init() catch unreachable;
diff --git a/src/napi/napi.zig b/src/napi/napi.zig
index 84a1008e9..54e16d153 100644
--- a/src/napi/napi.zig
+++ b/src/napi/napi.zig
@@ -10,14 +10,39 @@ const TODO_EXCEPTION: JSC.C.ExceptionRef = null;
const Channel = @import("../sync.zig").Channel;
pub const napi_env = *JSC.JSGlobalObject;
-pub const napi_ref = struct_napi_ref__;
+pub const Ref = opaque {
+ pub fn create(globalThis: *JSC.JSGlobalObject, value: JSValue) *Ref {
+ var ref: *Ref = undefined;
+ std.debug.assert(
+ napi_create_reference(
+ globalThis,
+ value,
+ 1,
+ &ref,
+ ) == .ok,
+ );
+ if (comptime bun.Environment.isDebug) {
+ std.debug.assert(ref.get(globalThis) == value);
+ }
+ return ref;
+ }
+
+ pub fn get(ref: *Ref, globalThis: *JSC.JSGlobalObject) JSValue {
+ var value: JSValue = JSValue.zero;
+ std.debug.assert(napi_get_reference_value(globalThis, ref, &value) == .ok);
+ return value;
+ }
+
+ pub fn destroy(ref: *Ref, globalThis: *JSC.JSGlobalObject) void {
+ std.debug.assert(napi_delete_reference(globalThis, ref) == .ok);
+ }
+};
pub const napi_handle_scope = napi_env;
pub const napi_escapable_handle_scope = struct_napi_escapable_handle_scope__;
pub const napi_callback_info = *JSC.CallFrame;
pub const napi_deferred = *JSC.JSPromise;
pub const napi_value = JSC.JSValue;
-pub const struct_napi_ref__ = opaque {};
pub const struct_napi_escapable_handle_scope__ = opaque {};
pub const struct_napi_deferred__ = opaque {};
@@ -629,16 +654,16 @@ pub extern fn napi_define_class(
properties: [*c]const napi_property_descriptor,
result: *napi_value,
) napi_status;
-pub extern fn napi_wrap(env: napi_env, js_object: napi_value, native_object: ?*anyopaque, finalize_cb: napi_finalize, finalize_hint: ?*anyopaque, result: [*c]napi_ref) napi_status;
+pub extern fn napi_wrap(env: napi_env, js_object: napi_value, native_object: ?*anyopaque, finalize_cb: napi_finalize, finalize_hint: ?*anyopaque, result: [*c]Ref) napi_status;
pub extern fn napi_unwrap(env: napi_env, js_object: napi_value, result: [*]*anyopaque) napi_status;
pub extern fn napi_remove_wrap(env: napi_env, js_object: napi_value, result: [*]*anyopaque) napi_status;
pub extern fn napi_create_external(env: napi_env, data: ?*anyopaque, finalize_cb: napi_finalize, finalize_hint: ?*anyopaque, result: *napi_value) napi_status;
pub extern fn napi_get_value_external(env: napi_env, value: napi_value, result: [*]*anyopaque) napi_status;
-pub extern fn napi_create_reference(env: napi_env, value: napi_value, initial_refcount: u32, result: [*c]napi_ref) napi_status;
-pub extern fn napi_delete_reference(env: napi_env, ref: napi_ref) napi_status;
-pub extern fn napi_reference_ref(env: napi_env, ref: napi_ref, result: [*c]u32) napi_status;
-pub extern fn napi_reference_unref(env: napi_env, ref: napi_ref, result: [*c]u32) napi_status;
-pub extern fn napi_get_reference_value(env: napi_env, ref: napi_ref, result: *napi_value) napi_status;
+pub extern fn napi_create_reference(env: napi_env, value: napi_value, initial_refcount: u32, result: **Ref) napi_status;
+pub extern fn napi_delete_reference(env: napi_env, ref: *Ref) napi_status;
+pub extern fn napi_reference_ref(env: napi_env, ref: *Ref, result: [*c]u32) napi_status;
+pub extern fn napi_reference_unref(env: napi_env, ref: *Ref, result: [*c]u32) napi_status;
+pub extern fn napi_get_reference_value(env: napi_env, ref: *Ref, result: *napi_value) napi_status;
// JSC scans the stack
// we don't need this
@@ -818,7 +843,7 @@ pub export fn napi_get_date_value(env: napi_env, value: napi_value, result: *f64
).asNumber();
return .ok;
}
-pub extern fn napi_add_finalizer(env: napi_env, js_object: napi_value, native_object: ?*anyopaque, finalize_cb: napi_finalize, finalize_hint: ?*anyopaque, result: *napi_ref) napi_status;
+pub extern fn napi_add_finalizer(env: napi_env, js_object: napi_value, native_object: ?*anyopaque, finalize_cb: napi_finalize, finalize_hint: ?*anyopaque, result: *Ref) napi_status;
pub export fn napi_create_bigint_int64(env: napi_env, value: i64, result: *napi_value) napi_status {
result.* = JSC.JSValue.fromInt64NoTruncate(env, value);
return .ok;
@@ -853,6 +878,7 @@ const WorkPoolTask = @import("../work_pool.zig").Task;
/// must be globally allocated
pub const napi_async_work = struct {
task: WorkPoolTask = .{ .callback = runFromThreadPool },
+ concurrent_task: JSC.ConcurrentTask = .{},
completion_task: ?*anyopaque = null,
event_loop: *JSC.EventLoop,
global: napi_env,
@@ -900,7 +926,7 @@ pub const napi_async_work = struct {
this.execute.?(this.global, this.ctx);
this.status.store(@enumToInt(Status.completed), .SeqCst);
- this.event_loop.enqueueTaskConcurrent(JSC.Task.init(this));
+ this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this));
}
pub fn schedule(this: *napi_async_work) void {
@@ -1139,6 +1165,8 @@ pub const ThreadSafeFunction = struct {
owning_threads: std.AutoArrayHashMapUnmanaged(u64, void) = .{},
owning_thread_lock: Lock = Lock.init(),
event_loop: *JSC.EventLoop,
+ concurrent_task: JSC.ConcurrentTask = .{},
+ concurrent_finalizer_task: JSC.ConcurrentTask = .{},
javascript_function: JSValue,
finalizer_task: JSC.AnyTask = undefined,
@@ -1243,7 +1271,7 @@ pub const ThreadSafeFunction = struct {
}
}
- this.event_loop.enqueueTaskConcurrent(JSC.Task.init(this));
+ this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this));
}
pub fn finalize(opaq: *anyopaque) void {
@@ -1284,7 +1312,7 @@ pub const ThreadSafeFunction = struct {
if (this.owning_threads.count() == 0) {
this.finalizer_task = JSC.AnyTask{ .ctx = this, .callback = finalize };
- this.event_loop.enqueueTaskConcurrent(JSC.Task.init(&this.finalizer_task));
+ this.event_loop.enqueueTaskConcurrent(this.concurrent_finalizer_task.from(&this.finalizer_task));
return;
}
}
diff --git a/src/pool.zig b/src/pool.zig
index 7894a5758..344e9ca69 100644
--- a/src/pool.zig
+++ b/src/pool.zig
@@ -179,6 +179,10 @@ pub fn ObjectPool(
return node;
}
+ pub fn first(allocator: std.mem.Allocator) *Type {
+ return &get(allocator).data;
+ }
+
pub fn get(allocator: std.mem.Allocator) *LinkedList.Node {
if (data().loaded) {
if (data().list.popFirst()) |node| {
@@ -204,6 +208,10 @@ pub fn ObjectPool(
return new_node;
}
+ pub fn releaseValue(value: *Type) void {
+ @fieldParentPtr(LinkedList.Node, "data", value).release();
+ }
+
pub fn release(node: *LinkedList.Node) void {
if (comptime max_count > 0) {
if (data().count >= max_count) {
diff --git a/src/thread_pool.zig b/src/thread_pool.zig
index 8839d2090..eac9a2055 100644
--- a/src/thread_pool.zig
+++ b/src/thread_pool.zig
@@ -93,6 +93,8 @@ pub const Batch = struct {
if (task.node.next) |node| {
this.head = @fieldParentPtr(Task, "node", node);
} else {
+ if (task != this.tail.?) unreachable;
+ this.tail = null;
this.head = null;
}