diff options
32 files changed, 1008 insertions, 677 deletions
diff --git a/.vscode/launch.json b/.vscode/launch.json index 024fe600f..3d5f4b58b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -29,7 +29,7 @@ "type": "lldb", "request": "launch", "name": "bun run current file", - "program": "/build/bun/packages/debug-bun-linux-x64/bun-debug", + "program": "bun-debug", "args": ["${file}"], "cwd": "${file}/../../", "env": { @@ -807,7 +807,7 @@ ifeq ($(OS_NAME),darwin) # Hardened runtime will not work with debugging bun-codesign-debug: - codesign --entitlements $(realpath entitlements.plist) --force --timestamp --sign "$(CODESIGN_IDENTITY)" -vvvv --deep --strict $(DEBUG_BUN) + codesign --entitlements $(realpath entitlements.debug.plist) --force --timestamp --sign "$(CODESIGN_IDENTITY)" -vvvv --deep --strict $(DEBUG_BUN) bun-codesign-release-local: codesign --entitlements $(realpath entitlements.plist) --options runtime --force --timestamp --sign "$(CODESIGN_IDENTITY)" -vvvv --deep --strict $(RELEASE_BUN) diff --git a/misctools/fetch.zig b/misctools/fetch.zig index ccb2a078f..0b990572f 100644 --- a/misctools/fetch.zig +++ b/misctools/fetch.zig @@ -172,15 +172,11 @@ pub fn main() anyerror!void { var args = try Arguments.parse(default_allocator); var body_out_str = try MutableString.init(default_allocator, 1024); - var body_in_str = try MutableString.init(default_allocator, args.body.len); - body_in_str.appendAssumeCapacity(args.body); var channel = try default_allocator.create(HTTP.HTTPChannel); channel.* = HTTP.HTTPChannel.init(); var response_body_string = try default_allocator.create(MutableString); response_body_string.* = body_out_str; - var request_body_string = try default_allocator.create(MutableString); - request_body_string.* = body_in_str; try channel.buffer.ensureTotalCapacity(1); @@ -196,7 +192,7 @@ pub fn main() anyerror!void { args.headers, args.headers_buf, response_body_string, - request_body_string, + args.body, 0, ), diff --git a/misctools/http_bench.zig b/misctools/http_bench.zig index 4df9fa6f0..dbec009f9 100644 --- a/misctools/http_bench.zig +++ b/misctools/http_bench.zig @@ -201,7 +201,6 @@ pub fn main() anyerror!void { if (args.concurrency > 0) HTTP.AsyncHTTP.max_simultaneous_requests = args.concurrency; const Group = struct { response_body: MutableString = undefined, - request_body: MutableString = undefined, context: HTTP.HTTPChannelContext = undefined, }; const Batch = @import("../src/thread_pool.zig").Batch; @@ -214,8 +213,6 @@ pub fn main() anyerror!void { groups[i] = Group{}; var response_body = &groups[i].response_body; response_body.* = try MutableString.init(default_allocator, 1024); - var request_body = &groups[i].request_body; - request_body.* = try MutableString.init(default_allocator, 0); var ctx = &groups[i].context; ctx.* = .{ @@ -226,8 +223,8 @@ pub fn main() anyerror!void { args.url, args.headers, args.headers_buf, - request_body, response_body, + "", args.timeout, ), }; diff --git a/src/analytics/analytics_thread.zig b/src/analytics/analytics_thread.zig index 82f435025..75cd3165b 100644 --- a/src/analytics/analytics_thread.zig +++ b/src/analytics/analytics_thread.zig @@ -404,7 +404,7 @@ fn readloop() anyerror!void { headers_entries, headers_buf, &out_buffer, - &event_list.in_buffer, + "", std.time.ns_per_ms * 10000, ) catch return; diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig index a37d5d62c..3fafdc177 100644 --- a/src/bun.js/api/bun.zig +++ b/src/bun.js/api/bun.zig @@ -1101,6 +1101,9 @@ pub const Class = NewClass( .nanoseconds = .{ .rfn = nanoseconds, }, + .DO_NOT_USE_OR_YOU_WILL_BE_FIRED_mimalloc_dump = .{ + .rfn = dump_mimalloc, + }, .gzipSync = .{ .rfn = JSC.wrapWithHasContainer(JSZlib, "gzipSync", false, false, true), }, @@ -1191,6 +1194,18 @@ pub const Class = NewClass( }, ); +fn dump_mimalloc( + _: void, + globalThis: JSC.C.JSContextRef, + _: JSC.C.JSObjectRef, + _: JSC.C.JSObjectRef, + _: []const JSC.C.JSValueRef, + _: JSC.C.ExceptionRef, +) JSC.C.JSValueRef { + globalThis.bunVM().arena.dumpThreadStats(); + return JSC.JSValue.jsUndefined().asObjectRef(); +} + pub const Crypto = struct { const Hashers = @import("../../sha.zig"); @@ -2101,6 +2116,8 @@ pub const Timer = struct { return VirtualMachine.vm.timer.last_id; } + const Pool = bun.ObjectPool(Timeout, null, true, 1000); + pub const Timeout = struct { id: i32 = 0, callback: JSValue, @@ -2134,11 +2151,13 @@ pub const Timer = struct { if (comptime JSC.is_bindgen) unreachable; + var vm = global.bunVM(); + if (!this.cancelled) { if (this.repeat) { this.io_task.?.deinit(); - var task = Timeout.TimeoutTask.createOnJSThread(VirtualMachine.vm.allocator, global, this) catch unreachable; - VirtualMachine.vm.timer.timeouts.put(VirtualMachine.vm.allocator, this.id, this) catch unreachable; + var task = Timeout.TimeoutTask.createOnJSThread(vm.allocator, global, this) catch unreachable; + vm.timer.timeouts.put(vm.allocator, this.id, this) catch unreachable; this.io_task = task; task.schedule(); } @@ -2148,12 +2167,12 @@ pub const Timer = struct { if (this.repeat) return; - VirtualMachine.vm.timer.active -|= 1; - VirtualMachine.vm.active_tasks -|= 1; + vm.timer.active -|= 1; + vm.active_tasks -|= 1; } else { // the active tasks count is already cleared for canceled timeout, // add one here to neutralize the `-|= 1` in event loop. - VirtualMachine.vm.active_tasks +|= 1; + vm.active_tasks +|= 1; } this.clear(global); @@ -2168,8 +2187,9 @@ pub const Timer = struct { _ = VirtualMachine.vm.timer.timeouts.swapRemove(this.id); if (this.io_task) |task| { task.deinit(); + this.io_task = null; } - VirtualMachine.vm.allocator.destroy(this); + Pool.releaseValue(this); } }; @@ -2181,7 +2201,7 @@ pub const Timer = struct { repeat: bool, ) !void { if (comptime is_bindgen) unreachable; - var timeout = try VirtualMachine.vm.allocator.create(Timeout); + var timeout = Pool.first(globalThis.bunVM().allocator); js.JSValueProtect(globalThis.ref(), callback.asObjectRef()); timeout.* = Timeout{ .id = id, .callback = callback, .interval = countdown.toInt32(), .repeat = repeat }; var task = try Timeout.TimeoutTask.createOnJSThread(VirtualMachine.vm.allocator, globalThis, timeout); diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index b79e6c7ab..55b829caa 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1850,6 +1850,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { has_js_deinited: bool = false, listen_callback: JSC.AnyTask = undefined, allocator: std.mem.Allocator, + keeping_js_alive: bool = false, pub const Class = JSC.NewClass( ThisServer, @@ -1917,15 +1918,17 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { } pub fn deinitIfWeCan(this: *ThisServer) void { - if (this.pending_requests == 0 and this.listener == null and this.has_js_deinited) + if (this.pending_requests == 0 and this.listener == null and this.has_js_deinited) { + this.deref(); this.deinit(); + } } pub fn stop(this: *ThisServer) void { if (this.listener) |listener| { - listener.close(); this.listener = null; - this.vm.disable_run_us_loop = false; + this.deref(); + listener.close(); } this.deinitIfWeCan(); @@ -2038,22 +2041,26 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { this.listener = socket; const needs_post_handler = this.vm.uws_event_loop == null; this.vm.uws_event_loop = uws.Loop.get(); - this.listen_callback = JSC.AnyTask.New(ThisServer, run).init(this); - this.vm.eventLoop().enqueueTask(JSC.Task.init(&this.listen_callback)); + this.ref(); + if (needs_post_handler) { _ = this.vm.uws_event_loop.?.addPostHandler(*JSC.EventLoop, this.vm.eventLoop(), JSC.EventLoop.tick); + _ = this.vm.uws_event_loop.?.addPreHandler(*JSC.EventLoop, this.vm.eventLoop(), JSC.EventLoop.tick); } } - pub fn run(this: *ThisServer) void { - // this.app.addServerName(hostname_pattern: [*:0]const u8) + pub fn ref(this: *ThisServer) void { + if (this.keeping_js_alive) return; + + this.vm.us_loop_reference_count +|= 1; + this.keeping_js_alive = true; + } - // we do not increment the reference count here - // uWS manages running the loop, so it is unnecessary - // this.vm.us_loop_reference_count +|= 1; - this.vm.disable_run_us_loop = true; + pub fn deref(this: *ThisServer) void { + if (!this.keeping_js_alive) return; - this.app.run(); + this.vm.us_loop_reference_count -|= 1; + this.keeping_js_alive = false; } pub fn onBunInfoRequest(this: *ThisServer, req: *uws.Request, resp: *App.Response) void { @@ -2286,11 +2293,20 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { this.app.get("/src:/*", *ThisServer, this, onSrcRequest); } - this.app.listenWithConfig(*ThisServer, this, onListen, .{ - .port = this.config.port, - .host = this.config.hostname, - .options = 0, - }); + const hostname = bun.span(this.config.hostname); + + if (!(hostname.len == 0 or strings.eqlComptime(hostname, "0.0.0.0"))) { + this.app.listenWithConfig(*ThisServer, this, onListen, .{ + .port = this.config.port, + .options = 0, + }); + } else { + this.app.listenWithConfig(*ThisServer, this, onListen, .{ + .port = this.config.port, + .host = this.config.hostname, + .options = 0, + }); + } } }; } diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index 2ded4e5cb..6a4cc7469 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -2770,7 +2770,6 @@ pub fn castObj(obj: js.JSObjectRef, comptime Type: type) *Type { const JSNode = @import("../js_ast.zig").Macro.JSNode; const LazyPropertiesObject = @import("../js_ast.zig").Macro.LazyPropertiesObject; const ModuleNamespace = @import("../js_ast.zig").Macro.ModuleNamespace; -const FetchTaskletContext = Fetch.FetchTasklet.FetchTaskletContext; const Expect = Test.Expect; const DescribeScope = Test.DescribeScope; const TestScope = Test.TestScope; @@ -2824,7 +2823,6 @@ pub const JSPrivateDataPtr = TaggedPointerUnion(.{ Expect, ExpectPrototype, FetchEvent, - FetchTaskletContext, HTMLRewriter, JSNode, LazyPropertiesObject, @@ -3497,25 +3495,26 @@ pub fn wrapWithHasContainer( } if (comptime maybe_async) { - var vm = ctx.ptr().bunVM(); - vm.tick(); - - var promise = JSC.JSInternalPromise.resolvedPromise(ctx.ptr(), result); - - switch (promise.status(ctx.ptr().vm())) { - JSC.JSPromise.Status.Pending => { - while (promise.status(ctx.ptr().vm()) == .Pending) { - vm.tick(); - } - result = promise.result(ctx.ptr().vm()); - }, - JSC.JSPromise.Status.Rejected => { - result = promise.result(ctx.ptr().vm()); - exception.* = result.asObjectRef(); - }, - JSC.JSPromise.Status.Fulfilled => { - result = promise.result(ctx.ptr().vm()); - }, + if (result.asPromise() != null or result.asInternalPromise() != null) { + var vm = ctx.ptr().bunVM(); + vm.tick(); + var promise = JSC.JSInternalPromise.resolvedPromise(ctx.ptr(), result); + + switch (promise.status(ctx.ptr().vm())) { + JSC.JSPromise.Status.Pending => { + while (promise.status(ctx.ptr().vm()) == .Pending) { + vm.tick(); + } + result = promise.result(ctx.ptr().vm()); + }, + JSC.JSPromise.Status.Rejected => { + result = promise.result(ctx.ptr().vm()); + exception.* = result.asObjectRef(); + }, + JSC.JSPromise.Status.Fulfilled => { + result = promise.result(ctx.ptr().vm()); + }, + } } } diff --git a/src/bun.js/bindings/JSBuffer.cpp b/src/bun.js/bindings/JSBuffer.cpp index 4b509f257..a1b9a5d40 100644 --- a/src/bun.js/bindings/JSBuffer.cpp +++ b/src/bun.js/bindings/JSBuffer.cpp @@ -248,8 +248,8 @@ static EncodedJSValue constructFromEncoding(JSGlobalObject* lexicalGlobalObject, result = Bun__encoding__constructFromLatin1(lexicalGlobalObject, view.characters8(), view.length(), static_cast<uint8_t>(encoding)); break; } - case WebCore::BufferEncodingType::ascii: // ascii is a noop for latin1 - case WebCore::BufferEncodingType::latin1: { // The native encoding is latin1, so we don't need to do any conversion. + case WebCore::BufferEncodingType::ascii: // ascii is a noop for latin1 + case WebCore::BufferEncodingType::latin1: { // The native encoding is latin1, so we don't need to do any conversion. result = JSBuffer__bufferFromPointerAndLength(lexicalGlobalObject, view.characters8(), view.length()); break; } @@ -1216,7 +1216,7 @@ static inline JSC::EncodedJSValue jsBufferPrototypeFunction_writeBody(JSC::JSGlo if (callFrame->argumentCount() > 2) { uint32_t arg_len = 0; arg_len = callFrame->argument(2).toUInt32(lexicalGlobalObject); - length = std::min(arg_len, length-offset); + length = std::min(arg_len, length - offset); } if (callFrame->argumentCount() > 2) { @@ -1329,7 +1329,6 @@ template<> EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSBufferConstructor::construc JSC::JSObject* constructor = lexicalGlobalObject->m_typedArrayUint8.constructor(lexicalGlobalObject); - // TODO: avoid this copy MarkedArgumentBuffer args; for (size_t i = 0; i < argsCount; ++i) args.append(callFrame->uncheckedArgument(i)); diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 51d55d2e0..a68376872 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -33,6 +33,7 @@ pub fn ConcurrentPromiseTask(comptime Context: type) type { allocator: std.mem.Allocator, promise: JSValue, globalThis: *JSGlobalObject, + concurrent_task: JSC.ConcurrentTask = .{}, pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This { var this = try allocator.create(This); @@ -75,68 +76,7 @@ pub fn ConcurrentPromiseTask(comptime Context: type) type { } pub fn onFinish(this: *This) void { - this.event_loop.enqueueTaskConcurrent(Task.init(this)); - } - - pub fn deinit(this: *This) void { - this.allocator.destroy(this); - } - }; -} - -pub fn SerialPromiseTask(comptime Context: type) type { - return struct { - const SerialWorkPool = @import("../work_pool.zig").NewWorkPool(1); - const This = @This(); - - ctx: *Context, - task: WorkPoolTask = .{ .callback = runFromThreadPool }, - event_loop: *JSC.EventLoop, - allocator: std.mem.Allocator, - promise: JSValue, - globalThis: *JSGlobalObject, - - pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This { - var this = try allocator.create(This); - this.* = .{ - .event_loop = VirtualMachine.vm.event_loop, - .ctx = value, - .allocator = allocator, - .promise = JSValue.createInternalPromise(globalThis), - .globalThis = globalThis, - }; - js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef()); - VirtualMachine.vm.active_tasks +|= 1; - return this; - } - - pub fn runFromThreadPool(task: *WorkPoolTask) void { - var this = @fieldParentPtr(This, "task", task); - Context.run(this.ctx); - this.onFinish(); - } - - pub fn runFromJS(this: This) void { - var promise_value = this.promise; - var promise = promise_value.asInternalPromise() orelse { - if (comptime @hasDecl(Context, "deinit")) { - @call(.{}, Context.deinit, .{this.ctx}); - } - return; - }; - - var ctx = this.ctx; - - js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef()); - ctx.then(promise, this.globalThis); - } - - pub fn schedule(this: *This) void { - SerialWorkPool.schedule(&this.task); - } - - pub fn onFinish(this: *This) void { - this.event_loop.enqueueTaskConcurrent(Task.init(this)); + this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this)); } pub fn deinit(this: *This) void { @@ -153,6 +93,7 @@ pub fn IOTask(comptime Context: type) type { event_loop: *JSC.EventLoop, allocator: std.mem.Allocator, globalThis: *JSGlobalObject, + concurrent_task: ConcurrentTask = .{}, pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This { var this = try allocator.create(This); @@ -182,53 +123,7 @@ pub fn IOTask(comptime Context: type) type { } pub fn onFinish(this: *This) void { - this.event_loop.enqueueTaskConcurrent(Task.init(this)); - } - - pub fn deinit(this: *This) void { - var allocator = this.allocator; - this.* = undefined; - allocator.destroy(this); - } - }; -} - -pub fn AsyncNativeCallbackTask(comptime Context: type) type { - return struct { - const This = @This(); - ctx: *Context, - task: WorkPoolTask = .{ .callback = runFromThreadPool }, - event_loop: *JSC.EventLoop, - allocator: std.mem.Allocator, - globalThis: *JSGlobalObject, - - pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This { - var this = try allocator.create(This); - this.* = .{ - .event_loop = VirtualMachine.vm.eventLoop(), - .ctx = value, - .allocator = allocator, - .globalThis = globalThis, - }; - VirtualMachine.vm.active_tasks +|= 1; - return this; - } - - pub fn runFromThreadPool(task: *WorkPoolTask) void { - var this = @fieldParentPtr(This, "task", task); - Context.run(this.ctx, this); - } - - pub fn runFromJS(this: This) void { - this.ctx.runFromJS(this.globalThis); - } - - pub fn schedule(this: *This) void { - WorkPool.get().schedule(WorkPool.schedule(&this.task)); - } - - pub fn onFinish(this: *This) void { - this.event_loop.enqueueTaskConcurrent(Task.init(this)); + this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this)); } pub fn deinit(this: *This) void { @@ -290,95 +185,97 @@ pub const Task = TaggedPointerUnion(.{ // PromiseTask, // TimeoutTasklet, }); +const UnboundedQueue = @import("./unbounded_queue.zig").UnboundedQueue; +pub const ConcurrentTask = struct { + task: Task = undefined, + next: ?*ConcurrentTask = null, + + pub const Queue = UnboundedQueue(ConcurrentTask, .next); + + pub fn from(this: *ConcurrentTask, of: anytype) *ConcurrentTask { + this.* = .{ + .task = Task.init(of), + .next = null, + }; + return this; + } +}; const AsyncIO = @import("io"); pub const EventLoop = struct { - ready_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), - pending_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), - io_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0), tasks: Queue = undefined, - concurrent_tasks: Queue = undefined, - concurrent_lock: Lock = Lock.init(), + concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{}, global: *JSGlobalObject = undefined, virtual_machine: *VirtualMachine = undefined, waker: ?AsyncIO.Waker = null, - + defer_count: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0), pub const Queue = std.fifo.LinearFifo(Task, .Dynamic); pub fn tickWithCount(this: *EventLoop) u32 { - var finished: u32 = 0; var global = this.global; var global_vm = global.vm(); var vm_ = this.virtual_machine; + var counter: usize = 0; while (this.tasks.readItem()) |task| { + defer counter += 1; switch (task.tag()) { .Microtask => { var micro: *Microtask = task.as(Microtask); micro.run(global); - finished += 1; }, .MicrotaskForDefaultGlobalObject => { var micro: *MicrotaskForDefaultGlobalObject = task.as(MicrotaskForDefaultGlobalObject); micro.run(global); - finished += 1; }, .FetchTasklet => { var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?; fetch_task.onDone(); - finished += 1; + fetch_task.deinit(); vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(AsyncTransformTask)) => { var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?; transform_task.*.runFromJS(); transform_task.deinit(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(CopyFilePromiseTask)) => { var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?; transform_task.*.runFromJS(); transform_task.deinit(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, typeBaseName(@typeName(JSC.napi.napi_async_work))) => { var transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?; transform_task.*.runFromJS(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(BunTimerTimeoutTask)) => { var transform_task: *BunTimerTimeoutTask = task.get(BunTimerTimeoutTask).?; transform_task.*.runFromJS(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(ReadFileTask)) => { var transform_task: *ReadFileTask = task.get(ReadFileTask).?; transform_task.*.runFromJS(); transform_task.deinit(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, @typeName(WriteFileTask)) => { var transform_task: *WriteFileTask = task.get(WriteFileTask).?; transform_task.*.runFromJS(); transform_task.deinit(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, typeBaseName(@typeName(AnyTask))) => { var any: *AnyTask = task.get(AnyTask).?; any.run(); - finished += 1; vm_.active_tasks -|= 1; }, @field(Task.Tag, typeBaseName(@typeName(CppTask))) => { var any: *CppTask = task.get(CppTask).?; any.run(global); - finished += 1; vm_.active_tasks -|= 1; }, else => if (Environment.allow_assert) { @@ -390,30 +287,39 @@ pub const EventLoop = struct { global_vm.drainMicrotasks(); } - if (finished > 0) { - _ = this.pending_tasks_count.fetchSub(finished, .Monotonic); + if (this.tasks.count == 0) { + this.tasks.head = 0; } - return finished; + return @truncate(u32, counter); } pub fn tickConcurrent(this: *EventLoop) void { - if (this.ready_tasks_count.load(.Monotonic) > 0) { - this.concurrent_lock.lock(); - defer this.concurrent_lock.unlock(); - const add: u32 = @truncate(u32, this.concurrent_tasks.readableLength()); + _ = this.tickConcurrentWithCount(); + } - // TODO: optimzie - this.tasks.ensureUnusedCapacity(add) catch unreachable; + pub fn tickConcurrentWithCount(this: *EventLoop) usize { + var concurrent = this.concurrent_tasks.popBatch(); + const count = concurrent.count; + if (count == 0) + return 0; - { - this.tasks.writeAssumeCapacity(this.concurrent_tasks.readableSlice(0)); - this.concurrent_tasks.discard(this.concurrent_tasks.count); - } + var iter = concurrent.iterator(); + const start_count = this.tasks.count; + if (start_count == 0) { + this.tasks.head = 0; + } - _ = this.pending_tasks_count.fetchAdd(add, .Monotonic); - _ = this.ready_tasks_count.fetchSub(add, .Monotonic); + this.tasks.ensureUnusedCapacity(count) catch unreachable; + var writable = this.tasks.writableSlice(0); + while (iter.next()) |task| { + writable[0] = task.task; + writable = writable[1..]; + this.tasks.count += 1; + if (writable.len == 0) break; } + + return this.tasks.count - start_count; } // TODO: fix this technical debt @@ -423,7 +329,9 @@ pub const EventLoop = struct { this.tickConcurrent(); var global_vm = ctx.global.vm(); while (true) { - while (this.tickWithCount() > 0) {} else { + while (this.tickWithCount() > 0) { + this.tickConcurrent(); + } else { global_vm.releaseWeakRefs(); global_vm.drainMicrotasks(); this.tickConcurrent(); @@ -436,13 +344,28 @@ pub const EventLoop = struct { break; } - if (!ctx.disable_run_us_loop and ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) { + this.global.handleRejectedPromises(); + } + + pub fn runUSocketsLoop(this: *EventLoop) void { + var ctx = this.virtual_machine; + + ctx.global.vm().releaseWeakRefs(); + ctx.global.vm().drainMicrotasks(); + + if (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) { + if (this.tickConcurrentWithCount() > 0) { + this.tick(); + } else if (ctx.uws_event_loop.?.num_polls > 0) { + if ((@intCast(c_ulonglong, ctx.uws_event_loop.?.internal_loop_data.iteration_nr) % 1_000) == 1) { + _ = ctx.global.vm().runGC(true); + } + } + ctx.is_us_loop_entered = true; ctx.enterUWSLoop(); ctx.is_us_loop_entered = false; } - - this.global.handleRejectedPromises(); } // TODO: fix this technical debt @@ -451,6 +374,10 @@ pub const EventLoop = struct { JSC.JSPromise.Status.Pending => { while (promise.status(this.global.vm()) == .Pending) { this.tick(); + + if (this.virtual_machine.uws_event_loop != null) { + this.runUSocketsLoop(); + } } }, else => {}, @@ -459,13 +386,20 @@ pub const EventLoop = struct { pub fn waitForTasks(this: *EventLoop) void { this.tick(); - while (this.pending_tasks_count.load(.Monotonic) > 0) { + while (this.tasks.count > 0) { this.tick(); + + if (this.virtual_machine.uws_event_loop != null) { + this.runUSocketsLoop(); + } + } else { + if (this.virtual_machine.uws_event_loop != null) { + this.runUSocketsLoop(); + } } } pub fn enqueueTask(this: *EventLoop, task: Task) void { - _ = this.pending_tasks_count.fetchAdd(1, .Monotonic); this.tasks.writeItem(task) catch unreachable; } @@ -476,19 +410,25 @@ pub const EventLoop = struct { } } - pub fn enqueueTaskConcurrent(this: *EventLoop, task: Task) void { + pub fn onDefer(this: *EventLoop) void { + this.defer_count.store(0, .Monotonic); + this.tick(); + } + + pub fn enqueueTaskConcurrent(this: *EventLoop, task: *ConcurrentTask) void { JSC.markBinding(); - this.concurrent_lock.lock(); - defer this.concurrent_lock.unlock(); - this.concurrent_tasks.writeItem(task) catch unreachable; + + this.concurrent_tasks.push(task); + if (this.virtual_machine.uws_event_loop) |loop| { - loop.nextTick(*EventLoop, this, EventLoop.tick); + const deferCount = this.defer_count.fetchAdd(1, .Monotonic); + if (deferCount == 0) { + loop.nextTick(*EventLoop, this, onDefer); + } } - if (this.ready_tasks_count.fetchAdd(1, .Monotonic) == 0) { - if (this.waker) |*waker| { - waker.wake() catch unreachable; - } + if (this.waker) |*waker| { + waker.wake() catch unreachable; } } }; diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig index 5eeff1ba8..4f97a79ad 100644 --- a/src/bun.js/javascript.zig +++ b/src/bun.js/javascript.zig @@ -368,7 +368,6 @@ pub const VirtualMachine = struct { rare_data: ?*JSC.RareData = null, poller: JSC.Poller = JSC.Poller{}, us_loop_reference_count: usize = 0, - disable_run_us_loop: bool = false, is_us_loop_entered: bool = false, pub fn io(this: *VirtualMachine) *IO { @@ -423,7 +422,7 @@ pub const VirtualMachine = struct { this.eventLoop().enqueueTask(task); } - pub inline fn enqueueTaskConcurrent(this: *VirtualMachine, task: Task) void { + pub inline fn enqueueTaskConcurrent(this: *VirtualMachine, task: JSC.ConcurrentTask) void { this.eventLoop().enqueueTaskConcurrent(task); } @@ -451,7 +450,7 @@ pub const VirtualMachine = struct { this.macro_event_loop.tasks.ensureTotalCapacity(16) catch unreachable; this.macro_event_loop.global = this.global; this.macro_event_loop.virtual_machine = this; - this.macro_event_loop.concurrent_tasks = EventLoop.Queue.init(default_allocator); + this.macro_event_loop.concurrent_tasks = .{}; } this.bundler.options.platform = .bun_macro; @@ -555,8 +554,7 @@ pub const VirtualMachine = struct { default_allocator, ); VirtualMachine.vm.regular_event_loop.tasks.ensureUnusedCapacity(64) catch unreachable; - VirtualMachine.vm.regular_event_loop.concurrent_tasks = EventLoop.Queue.init(default_allocator); - VirtualMachine.vm.regular_event_loop.concurrent_tasks.ensureUnusedCapacity(8) catch unreachable; + VirtualMachine.vm.regular_event_loop.concurrent_tasks = .{}; VirtualMachine.vm.event_loop = &VirtualMachine.vm.regular_event_loop; vm.bundler.macro_context = null; @@ -1435,6 +1433,7 @@ pub const VirtualMachine = struct { pub fn loadEntryPoint(this: *VirtualMachine, entry_path: string) !*JSInternalPromise { this.main = entry_path; try this.entry_point.generate(@TypeOf(this.bundler), &this.bundler, Fs.PathName.init(entry_path), main_file_name); + this.eventLoop().ensureWaker(); var promise: *JSInternalPromise = undefined; @@ -1455,7 +1454,15 @@ pub const VirtualMachine = struct { promise = JSModuleLoader.loadAndEvaluateModule(this.global, &ZigString.init(this.main)); } - this.waitForPromise(promise); + while (promise.status(this.global.vm()) == .Pending) { + this.eventLoop().tick(); + _ = this.eventLoop().waker.?.wait() catch 0; + } + + if (this.us_loop_reference_count > 0) { + _ = this.global.vm().runGC(true); + this.eventLoop().runUSocketsLoop(); + } return promise; } diff --git a/src/bun.js/unbounded_queue.zig b/src/bun.js/unbounded_queue.zig new file mode 100644 index 000000000..fd092290d --- /dev/null +++ b/src/bun.js/unbounded_queue.zig @@ -0,0 +1,149 @@ +const std = @import("std"); + +const os = std.os; +const mem = std.mem; +const meta = std.meta; +const atomic = std.atomic; +const builtin = std.builtin; +const testing = std.testing; + +const assert = std.debug.assert; + +const mpsc = @This(); + +pub const cache_line_length = switch (@import("builtin").target.cpu.arch) { + .x86_64, .aarch64, .powerpc64 => 128, + .arm, .mips, .mips64, .riscv64 => 32, + .s390x => 256, + else => 64, +}; + +pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) type { + const next = meta.fieldInfo(T, next_field).name; + + return struct { + const Self = @This(); + + pub const Batch = struct { + pub const Iterator = struct { + batch: Self.Batch, + + pub fn next(self: *Self.Batch.Iterator) ?*T { + if (self.batch.count == 0) return null; + const front = self.batch.front orelse unreachable; + self.batch.front = @field(front, next); + self.batch.count -= 1; + return front; + } + }; + + front: ?*T = null, + last: ?*T = null, + count: usize = 0, + + pub fn iterator(self: Self.Batch) Self.Batch.Iterator { + return .{ .batch = self }; + } + }; + + pub const queue_padding_length = cache_line_length / 2; + + back: ?*T align(queue_padding_length) = null, + count: usize = 0, + front: T align(queue_padding_length) = init: { + var stub: T = undefined; + @field(stub, next) = null; + break :init stub; + }, + + pub fn push(self: *Self, src: *T) void { + assert(@atomicRmw(usize, &self.count, .Add, 1, .Release) >= 0); + + @field(src, next) = null; + const old_back = @atomicRmw(?*T, &self.back, .Xchg, src, .AcqRel) orelse &self.front; + @field(old_back, next) = src; + } + + pub fn pushBatch(self: *Self, first: *T, last: *T, count: usize) void { + assert(@atomicRmw(usize, &self.count, .Add, count, .Release) >= 0); + + @field(last, next) = null; + const old_back = @atomicRmw(?*T, &self.back, .Xchg, last, .AcqRel) orelse &self.front; + @field(old_back, next) = first; + } + + pub fn pop(self: *Self) ?*T { + const first = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return null; + if (@atomicLoad(?*T, &@field(first, next), .Acquire)) |next_item| { + @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic); + assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1); + return first; + } + const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front; + if (first != last) return null; + @atomicStore(?*T, &@field(self.front, next), null, .Monotonic); + if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) { + assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1); + return first; + } + var next_item = @atomicLoad(?*T, &@field(first, next), .Acquire); + while (next_item == null) : (atomic.spinLoopHint()) { + next_item = @atomicLoad(?*T, &@field(first, next), .Acquire); + } + @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic); + assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1); + return first; + } + + pub fn popBatch(self: *Self) Self.Batch { + var batch: Self.Batch = .{}; + + var front = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return batch; + batch.front = front; + + var next_item = @atomicLoad(?*T, &@field(front, next), .Acquire); + while (next_item) |next_node| : (next_item = @atomicLoad(?*T, &@field(next_node, next), .Acquire)) { + batch.count += 1; + batch.last = front; + + front = next_node; + } + + const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front; + if (front != last) { + @atomicStore(?*T, &@field(self.front, next), front, .Release); + assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count); + return batch; + } + + @atomicStore(?*T, &@field(self.front, next), null, .Monotonic); + if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) { + batch.count += 1; + batch.last = front; + assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count); + return batch; + } + + next_item = @atomicLoad(?*T, &@field(front, next), .Acquire); + while (next_item == null) : (atomic.spinLoopHint()) { + next_item = @atomicLoad(?*T, &@field(front, next), .Acquire); + } + + batch.count += 1; + @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic); + batch.last = front; + assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count); + return batch; + } + + pub fn peek(self: *Self) usize { + const count = @atomicLoad(usize, &self.count, .Acquire); + assert(count >= 0); + return count; + } + + pub fn isEmpty(self: *Self) bool { + return self.peek() == 0; + } + }; +} diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 6c1fc49f9..2ef8225f3 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -515,90 +515,79 @@ pub const Fetch = struct { ); pub const FetchTasklet = struct { - promise: *JSPromise = undefined, - http: HTTPClient.AsyncHTTP = undefined, - result: HTTPClient.HTTPClientResult = undefined, - status: Status = Status.pending, + http: ?*HTTPClient.AsyncHTTP = null, + result: HTTPClient.HTTPClientResult = .{}, javascript_vm: *VirtualMachine = undefined, global_this: *JSGlobalObject = undefined, - - empty_request_body: MutableString = undefined, - - context: FetchTaskletContext = undefined, + request_body: Blob = undefined, response_buffer: MutableString = undefined, - - blob_store: ?*Blob.Store = null, - - const Pool = ObjectPool(FetchTasklet, init, true, 32); - const BodyPool = ObjectPool(MutableString, MutableString.init2048, true, 8); - pub const FetchTaskletContext = struct { - tasklet: *FetchTasklet, - }; + request_headers: Headers = Headers{ .allocator = undefined }, + ref: *JSC.napi.Ref = undefined, + concurrent_task: JSC.ConcurrentTask = .{}, pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet { return FetchTasklet{}; } - pub const Status = enum(u8) { - pending, - running, - done, - }; + fn clearData(this: *FetchTasklet) void { + this.request_headers.entries.deinit(bun.default_allocator); + this.request_headers.buf.deinit(bun.default_allocator); + this.request_headers = Headers{ .allocator = undefined }; + this.http.?.deinit(); + + this.result.deinitMetadata(); + this.response_buffer.deinit(); + this.request_body.detach(); + } + + pub fn deinit(this: *FetchTasklet) void { + if (this.http) |http| this.javascript_vm.allocator.destroy(http); + this.javascript_vm.allocator.destroy(this); + } pub fn onDone(this: *FetchTasklet) void { if (comptime JSC.is_bindgen) unreachable; const globalThis = this.global_this; - const promise = this.promise; - const state = this.http.state.load(.Monotonic); - const result = switch (state) { - .success => this.onResolve(), - .fail => this.onReject(), - else => unreachable, + + var ref = this.ref; + const promise_value = ref.get(globalThis); + defer ref.destroy(globalThis); + + if (promise_value.isEmptyOrUndefinedOrNull()) { + this.clearData(); + return; + } + + var promise = promise_value.asPromise().?; + + const success = this.result.isSuccess(); + const result = switch (success) { + true => this.onResolve(), + false => this.onReject(), }; - this.release(); - const promise_value = promise.asValue(globalThis); - promise_value.unprotect(); + this.clearData(); - switch (state) { - .success => { + promise_value.ensureStillAlive(); + + switch (success) { + true => { promise.resolve(globalThis, result); }, - .fail => { + false => { promise.reject(globalThis, result); }, - else => unreachable, } } - pub fn reset(_: *FetchTasklet) void {} - - pub fn release(this: *FetchTasklet) void { - this.global_this = undefined; - this.javascript_vm = undefined; - this.promise = undefined; - this.status = Status.pending; - // var pooled = this.pooled_body; - // BodyPool.release(pooled); - // this.pooled_body = undefined; - this.http = undefined; - - Pool.release(@fieldParentPtr(Pool.Node, "data", this)); - } - pub fn onReject(this: *FetchTasklet) JSValue { - if (this.blob_store) |store| { - this.blob_store = null; - store.deref(); - } - defer this.result.deinitMetadata(); const fetch_error = std.fmt.allocPrint( default_allocator, "fetch() failed {s}\nurl: \"{s}\"", .{ - @errorName(this.http.err orelse error.HTTPFail), + this.result.fail, this.result.href, }, ) catch unreachable; @@ -606,26 +595,27 @@ pub const Fetch = struct { } pub fn onResolve(this: *FetchTasklet) JSValue { - var allocator = default_allocator; - var http_response = this.http.response.?; + var allocator = this.global_this.bunVM().allocator; + const http_response = this.result.response; var response = allocator.create(Response) catch unreachable; - if (this.blob_store) |store| { - this.blob_store = null; - store.deref(); - } - defer this.result.deinitMetadata(); + const blob = Blob.init(this.response_buffer.toOwnedSliceLeaky(), allocator, this.global_this); + this.response_buffer = .{ .allocator = default_allocator, .list = .{ + .items = &.{}, + .capacity = 0, + } }; + response.* = Response{ .allocator = allocator, .url = allocator.dupe(u8, this.result.href) catch unreachable, .status_text = allocator.dupe(u8, http_response.status) catch unreachable, - .redirected = this.http.redirected, + .redirected = this.result.redirected, .body = .{ .init = .{ .headers = FetchHeaders.createFromPicoHeaders(this.global_this, http_response.headers), .status_code = @truncate(u16, http_response.status_code), }, .value = .{ - .Blob = Blob.init(this.http.response_buffer.toOwnedSliceLeaky(), allocator, this.global_this), + .Blob = blob, }, }, }; @@ -636,38 +626,50 @@ pub const Fetch = struct { allocator: std.mem.Allocator, method: Method, url: ZigURL, - headers: Headers.Entries, - headers_buf: string, - request_body: ?*MutableString, + headers: Headers, + request_body: Blob, timeout: usize, - request_body_store: ?*Blob.Store, - ) !*FetchTasklet.Pool.Node { - var linked_list = FetchTasklet.Pool.get(allocator); - linked_list.data.javascript_vm = VirtualMachine.vm; - linked_list.data.empty_request_body = MutableString.init(allocator, 0) catch unreachable; - // linked_list.data.pooled_body = BodyPool.get(allocator); - linked_list.data.blob_store = request_body_store; - linked_list.data.response_buffer = MutableString.initEmpty(allocator); - linked_list.data.http = HTTPClient.AsyncHTTP.init( + globalThis: *JSC.JSGlobalObject, + promise: JSValue, + ) !*FetchTasklet { + var jsc_vm = globalThis.bunVM(); + var fetch_tasklet = try jsc_vm.allocator.create(FetchTasklet); + if (request_body.store) |store| { + store.ref(); + } + + fetch_tasklet.* = .{ + .response_buffer = MutableString{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }, + .http = try jsc_vm.allocator.create(HTTPClient.AsyncHTTP), + .javascript_vm = jsc_vm, + .request_body = request_body, + .global_this = globalThis, + .request_headers = headers, + .ref = JSC.napi.Ref.create(globalThis, promise), + }; + fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init( allocator, method, url, - headers, - headers_buf, - &linked_list.data.response_buffer, - request_body orelse &linked_list.data.empty_request_body, + headers.entries, + headers.buf.items, + &fetch_tasklet.response_buffer, + request_body.sharedView(), timeout, - undefined, - ); - linked_list.data.context = .{ .tasklet = &linked_list.data }; - linked_list.data.http.completion_callback = HTTPClient.HTTPClientResult.Callback.New( - *FetchTasklet, - FetchTasklet.callback, - ).init( - &linked_list.data, + HTTPClient.HTTPClientResult.Callback.New( + *FetchTasklet, + FetchTasklet.callback, + ).init( + fetch_tasklet, + ), ); - - return linked_list; + return fetch_tasklet; } pub fn queue( @@ -675,27 +677,36 @@ pub const Fetch = struct { global: *JSGlobalObject, method: Method, url: ZigURL, - headers: Headers.Entries, - headers_buf: string, - request_body: ?*MutableString, + headers: Headers, + request_body: Blob, timeout: usize, - request_body_store: ?*Blob.Store, - ) !*FetchTasklet.Pool.Node { + promise: JSValue, + ) !*FetchTasklet { try HTTPClient.HTTPThread.init(); - var node = try get(allocator, method, url, headers, headers_buf, request_body, timeout, request_body_store); + var node = try get( + allocator, + method, + url, + headers, + request_body, + timeout, + global, + promise, + ); - node.data.global_this = global; var batch = NetworkThread.Batch{}; - node.data.http.schedule(allocator, &batch); - HTTPClient.http_thread.schedule(batch); + node.http.?.schedule(allocator, &batch); VirtualMachine.vm.active_tasks +|= 1; + + HTTPClient.http_thread.schedule(batch); + return node; } pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void { task.response_buffer = result.body.?.*; task.result = result; - task.javascript_vm.eventLoop().enqueueTaskConcurrent(Task.init(task)); + task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task)); } }; @@ -715,12 +726,11 @@ pub const Fetch = struct { } var headers: ?Headers = null; - var body: MutableString = MutableString.initEmpty(bun.default_allocator); var method = Method.GET; var args = JSC.Node.ArgumentsSlice.from(ctx.bunVM(), arguments); var url: ZigURL = undefined; var first_arg = args.nextEat().?; - var blob_store: ?*Blob.Store = null; + var body: Blob = Blob.initEmpty(ctx); if (first_arg.isString()) { var url_zig_str = ZigString.init(""); JSValue.fromRef(arguments[0]).toZigString(&url_zig_str, globalThis); @@ -737,7 +747,6 @@ pub const Fetch = struct { url_str = getAllocator(ctx).dupe(u8, url_str) catch unreachable; } - NetworkThread.init() catch @panic("Failed to start network thread"); url = ZigURL.parse(url_str); if (arguments.len >= 2 and arguments[1].?.value().isObject()) { @@ -760,18 +769,7 @@ pub const Fetch = struct { if (options.fastGet(ctx.ptr(), .body)) |body__| { if (Blob.fromJS(ctx.ptr(), body__, true, false)) |new_blob| { - if (new_blob.size > 0) { - body = MutableString{ - .list = std.ArrayListUnmanaged(u8){ - .items = bun.constStrToU8(new_blob.sharedView()), - .capacity = new_blob.size, - }, - .allocator = bun.default_allocator, - }; - blob_store = new_blob.store; - } - // transfer is unnecessary here because this is a new slice - //new_blob.transfer(); + body = new_blob; } else |_| { return JSPromise.rejectedPromiseValue(globalThis, ZigString.init("fetch() received invalid body").toErrorInstance(globalThis)).asRef(); } @@ -783,54 +781,28 @@ pub const Fetch = struct { if (request.headers) |head| { headers = Headers.from(head, bun.default_allocator) catch unreachable; } - var blob = request.body.use(); - // TODO: make RequestBody _NOT_ a MutableString - body = MutableString{ - .list = std.ArrayListUnmanaged(u8){ - .items = bun.constStrToU8(blob.sharedView()), - .capacity = bun.constStrToU8(blob.sharedView()).len, - }, - .allocator = blob.allocator orelse bun.default_allocator, - }; - blob_store = blob.store; + body = request.body.use(); } else { const fetch_error = fetch_type_error_strings.get(js.JSValueGetType(ctx, arguments[0])); return JSPromise.rejectedPromiseValue(globalThis, ZigString.init(fetch_error).toErrorInstance(globalThis)).asRef(); } - var header_entries: Headers.Entries = .{}; - var header_buf: string = ""; - - if (headers) |head| { - header_entries = head.entries; - header_buf = head.buf.items; - } - - var request_body: ?*MutableString = null; - if (body.list.items.len > 0) { - var mutable = bun.default_allocator.create(MutableString) catch unreachable; - mutable.* = body; - request_body = mutable; - } + var deferred_promise = JSC.C.JSObjectMakeDeferredPromise(globalThis, null, null, null); // var resolve = FetchTasklet.FetchResolver.Class.make(ctx: js.JSContextRef, ptr: *ZigType) - var queued = FetchTasklet.queue( + _ = FetchTasklet.queue( default_allocator, globalThis, method, url, - header_entries, - header_buf, - request_body, + headers orelse Headers{ + .allocator = bun.default_allocator, + }, + body, std.time.ns_per_hour, - blob_store, + JSC.JSValue.fromRef(deferred_promise), ) catch unreachable; - const promise = JSC.JSPromise.create(ctx); - queued.data.promise = promise; - const promise_value = promise.asValue(ctx); - promise_value.protect(); - - return promise_value.asObjectRef(); + return deferred_promise; } }; @@ -1991,7 +1963,7 @@ pub const Blob = struct { } } pub fn run(this: *ReadFile, task: *ReadFileTask) void { - var frame = HTTPClient.getAllocator().create(@Frame(runAsync)) catch unreachable; + var frame = bun.default_allocator.create(@Frame(runAsync)) catch unreachable; _ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{ this, task }); } @@ -2020,7 +1992,7 @@ pub const Blob = struct { task.onFinish(); suspend { - HTTPClient.getAllocator().destroy(@frame()); + bun.default_allocator.destroy(@frame()); } } @@ -2236,7 +2208,7 @@ pub const Blob = struct { cb(cb_ctx, .{ .result = @truncate(SizeType, wrote) }); } pub fn run(this: *WriteFile, task: *WriteFileTask) void { - var frame = HTTPClient.getAllocator().create(@Frame(runAsync)) catch unreachable; + var frame = bun.default_allocator.create(@Frame(runAsync)) catch unreachable; _ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{ this, task }); } @@ -2244,7 +2216,7 @@ pub const Blob = struct { this._runAsync(); task.onFinish(); suspend { - HTTPClient.getAllocator().destroy(@frame()); + bun.default_allocator.destroy(@frame()); } } diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 242e3db49..1158a8dd2 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -2340,6 +2340,7 @@ pub const FileBlobLoader = struct { read_frame: anyframe = undefined, chunk_size: Blob.SizeType = 0, main_thread_task: JSC.AnyTask = .{ .callback = onJSThread, .ctx = null }, + concurrent_task: JSC.ConcurrentTask = .{}, pub fn taskCallback(task: *NetworkThread.Task) void { var this = @fieldParentPtr(FileBlobLoader, "concurrent", @fieldParentPtr(Concurrent, "task", task)); @@ -2475,7 +2476,7 @@ pub const FileBlobLoader = struct { pub fn scheduleMainThreadTask(this: *FileBlobLoader) void { this.concurrent.main_thread_task.ctx = this; - this.loop.enqueueTaskConcurrent(JSC.Task.init(&this.concurrent.main_thread_task)); + this.loop.enqueueTaskConcurrent(this.concurrent.concurrent_task.from(&this.concurrent.main_thread_task)); } fn runAsync(this: *FileBlobLoader) void { diff --git a/src/bun_js.zig b/src/bun_js.zig index bb5f458f3..87b618309 100644 --- a/src/bun_js.zig +++ b/src/bun_js.zig @@ -31,6 +31,7 @@ const which = @import("which.zig").which; const VirtualMachine = @import("javascript_core").VirtualMachine; const JSC = @import("javascript_core"); const AsyncHTTP = @import("http").AsyncHTTP; +const Arena = @import("./mimalloc_arena.zig").Arena; const OpaqueWrap = JSC.OpaqueWrap; @@ -39,6 +40,7 @@ pub const Run = struct { ctx: Command.Context, vm: *VirtualMachine, entry_path: string, + arena: Arena = undefined, pub fn boot(ctx: Command.Context, file: std.fs.File, entry_path: string) !void { if (comptime JSC.is_bindgen) unreachable; @@ -46,13 +48,16 @@ pub const Run = struct { js_ast.Expr.Data.Store.create(default_allocator); js_ast.Stmt.Data.Store.create(default_allocator); + var arena = try Arena.init(); var run = Run{ - .vm = try VirtualMachine.init(ctx.allocator, ctx.args, null, ctx.log, null), + .vm = try VirtualMachine.init(arena.allocator(), ctx.args, null, ctx.log, null), .file = file, + .arena = arena, .ctx = ctx, .entry_path = entry_path, }; + run.vm.arena = &run.arena; run.vm.argv = ctx.positionals; @@ -131,19 +136,21 @@ pub const Run = struct { } this.vm.global.vm().releaseWeakRefs(); + _ = this.vm.arena.gc(false); _ = this.vm.global.vm().runGC(false); this.vm.tick(); { var any = false; - while (this.vm.*.event_loop.pending_tasks_count.loadUnchecked() > 0 or this.vm.active_tasks > 0) { + while (this.vm.eventLoop().tasks.count > 0 or this.vm.active_tasks > 0) { this.vm.tick(); any = true; if (this.vm.active_tasks > 0) { - if (this.vm.event_loop.ready_tasks_count.load(.Monotonic) == 0) { + if (this.vm.eventLoop().tickConcurrentWithCount() == 0) { + _ = this.vm.arena.gc(false); _ = this.vm.global.vm().runGC(false); - if (this.vm.event_loop.ready_tasks_count.load(.Monotonic) == 0 and + if (this.vm.eventLoop().tickConcurrentWithCount() == 0 and this.vm.active_tasks > 0) { this.vm.event_loop.ensureWaker(); diff --git a/src/cli/create_command.zig b/src/cli/create_command.zig index ee6266f40..27531e4ad 100644 --- a/src/cli/create_command.zig +++ b/src/cli/create_command.zig @@ -1845,8 +1845,6 @@ pub const Example = struct { var mutable = try ctx.allocator.create(MutableString); mutable.* = try MutableString.init(ctx.allocator, 8096); - var request_body = try MutableString.init(ctx.allocator, 0); - // ensure very stable memory address var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable; async_http.* = HTTP.AsyncHTTP.initSync( @@ -1856,7 +1854,7 @@ pub const Example = struct { header_entries, headers_buf, mutable, - &request_body, + "", 60 * std.time.ns_per_min, ); async_http.client.progress_node = progress; @@ -1916,12 +1914,20 @@ pub const Example = struct { var mutable = try ctx.allocator.create(MutableString); mutable.* = try MutableString.init(ctx.allocator, 2048); - var request_body = try MutableString.init(ctx.allocator, 0); url = URL.parse(try std.fmt.bufPrint(&url_buf, "https://registry.npmjs.org/@bun-examples/{s}/latest", .{name})); // ensure very stable memory address var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable; - async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, url, .{}, "", mutable, &request_body, 60 * std.time.ns_per_min); + async_http.* = HTTP.AsyncHTTP.initSync( + ctx.allocator, + .GET, + url, + .{}, + "", + mutable, + "", + 60 * std.time.ns_per_min, + ); async_http.client.progress_node = progress; var response = try async_http.sendSync(true); @@ -1993,7 +1999,16 @@ pub const Example = struct { mutable.reset(); // ensure very stable memory address - async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, URL.parse(tarball_url), .{}, "", mutable, &request_body, 60 * std.time.ns_per_min); + async_http.* = HTTP.AsyncHTTP.initSync( + ctx.allocator, + .GET, + URL.parse(tarball_url), + .{}, + "", + mutable, + "", + 60 * std.time.ns_per_min, + ); async_http.client.progress_node = progress; refresher.maybeRefresh(); @@ -2018,7 +2033,6 @@ pub const Example = struct { url = URL.parse(examples_url); var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable; - var request_body = try MutableString.init(ctx.allocator, 0); var mutable = try ctx.allocator.create(MutableString); mutable.* = try MutableString.init(ctx.allocator, 2048); @@ -2029,7 +2043,7 @@ pub const Example = struct { .{}, "", mutable, - &request_body, + "", 60 * std.time.ns_per_min, ); diff --git a/src/cli/test_command.zig b/src/cli/test_command.zig index 92691fa05..1f3ea95cb 100644 --- a/src/cli/test_command.zig +++ b/src/cli/test_command.zig @@ -36,6 +36,7 @@ var path_buf: [bun.MAX_PATH_BYTES]u8 = undefined; var path_buf2: [bun.MAX_PATH_BYTES]u8 = undefined; const PathString = bun.PathString; const is_bindgen = std.meta.globalOption("bindgen", bool) orelse false; +const HTTPThread = @import("http").HTTPThread; const JSC = @import("javascript_core"); const Jest = JSC.Jest; @@ -296,6 +297,7 @@ pub const TestCommand = struct { }; JSC.C.JSCInitialize(); NetworkThread.init() catch {}; + HTTPThread.init() catch {}; var reporter = try ctx.allocator.create(CommandLineReporter); reporter.* = CommandLineReporter{ .jest = TestRunner{ @@ -443,10 +445,6 @@ pub const TestCommand = struct { Output.flush(); var promise = try vm.loadEntryPoint(resolution.path_pair.primary.text); - while (promise.status(vm.global.vm()) == .Pending) { - vm.tick(); - } - switch (promise.status(vm.global.vm())) { .Rejected => { var result = promise.result(vm.global.vm()); diff --git a/src/cli/upgrade_command.zig b/src/cli/upgrade_command.zig index 15881e984..912dbe921 100644 --- a/src/cli/upgrade_command.zig +++ b/src/cli/upgrade_command.zig @@ -207,7 +207,6 @@ pub const UpgradeCommand = struct { } var metadata_body = try MutableString.init(allocator, 2048); - var request_body = try MutableString.init(allocator, 0); // ensure very stable memory address var async_http: *HTTP.AsyncHTTP = allocator.create(HTTP.AsyncHTTP) catch unreachable; @@ -218,7 +217,7 @@ pub const UpgradeCommand = struct { header_entries, headers_buf, &metadata_body, - &request_body, + "", 60 * std.time.ns_per_min, ); if (!silent) async_http.client.progress_node = progress; @@ -441,7 +440,6 @@ pub const UpgradeCommand = struct { var async_http = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable; var zip_file_buffer = try ctx.allocator.create(MutableString); zip_file_buffer.* = try MutableString.init(ctx.allocator, @maximum(version.size, 1024)); - var request_buffer = try MutableString.init(ctx.allocator, 0); async_http.* = HTTP.AsyncHTTP.initSync( ctx.allocator, @@ -450,7 +448,7 @@ pub const UpgradeCommand = struct { .{}, "", zip_file_buffer, - &request_buffer, + "", timeout, ); async_http.client.timeout = timeout; diff --git a/src/deps/uws.zig b/src/deps/uws.zig index a2a7da143..103a8dc7c 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -262,11 +262,58 @@ pub const SocketTLS = NewSocketHandler(true); pub const us_timer_t = opaque {}; pub const us_socket_context_t = opaque {}; -pub const Loop = opaque { +pub const Loop = extern struct { + internal_loop_data: InternalLoopData align(16), + + /// Number of non-fallthrough polls in the loop + num_polls: c_int, + + /// Number of ready polls this iteration + num_ready_polls: c_int, + + /// Current index in list of ready polls + current_ready_poll: c_int, + + /// Loop's own file descriptor + fd: c_int, + + /// The list of ready polls + ready_polls: [1024]EventType, + + const EventType = if (Environment.isLinux) std.os.linux.epoll_event else if (Environment.isMac) std.os.Kevent; + + pub const InternalLoopData = extern struct { + pub const us_internal_async = opaque {}; + + sweep_timer: ?*us_timer_t, + wakeup_async: ?*us_internal_async, + last_write_failed: c_int, + head: ?*us_socket_context_t, + iterator: ?*us_socket_context_t, + recv_buf: [*]u8, + ssl_data: ?*anyopaque, + pre_cb: ?fn (?*Loop) callconv(.C) void, + post_cb: ?fn (?*Loop) callconv(.C) void, + closed_head: ?*Socket, + low_prio_head: ?*Socket, + low_prio_budget: c_int, + iteration_nr: c_longlong, + }; + pub fn get() ?*Loop { return uws_get_loop(); } + pub fn create(comptime Handler: anytype) *Loop { + return us_create_loop( + null, + Handler.wakeup, + if (@hasDecl(Handler, "pre")) Handler.pre else null, + if (@hasDecl(Handler, "post")) Handler.post else null, + 0, + ).?; + } + pub fn wakeup(this: *Loop) void { return us_wakeup_loop(this); } @@ -305,6 +352,15 @@ pub const Loop = opaque { }; } + pub fn addPreHandler(this: *Loop, comptime UserType: type, ctx: UserType, comptime callback: fn (UserType) void) NewHandler(UserType, callback) { + const Handler = NewHandler(UserType, callback); + + uws_loop_addPreHandler(this, ctx, Handler.callback); + return Handler{ + .loop = this, + }; + } + pub fn run(this: *Loop) void { us_loop_run(this); } @@ -312,7 +368,7 @@ pub const Loop = opaque { extern fn uws_loop_defer(loop: *Loop, ctx: *anyopaque, cb: fn (ctx: *anyopaque) callconv(.C) void) void; extern fn uws_get_loop() ?*Loop; - extern fn us_create_loop(hint: ?*anyopaque, wakeup_cb: ?fn (?*Loop) callconv(.C) void, pre_cb: ?fn (?*Loop) callconv(.C) void, post_cb: ?fn (?*Loop) callconv(.C) void, ext_size: c_uint) ?*Loop; + extern fn us_create_loop(hint: ?*anyopaque, wakeup_cb: ?fn (*Loop) callconv(.C) void, pre_cb: ?fn (*Loop) callconv(.C) void, post_cb: ?fn (*Loop) callconv(.C) void, ext_size: c_uint) ?*Loop; extern fn us_loop_free(loop: ?*Loop) void; extern fn us_loop_ext(loop: ?*Loop) ?*anyopaque; extern fn us_loop_run(loop: ?*Loop) void; @@ -322,6 +378,8 @@ pub const Loop = opaque { extern fn us_loop_iteration_number(loop: ?*Loop) c_longlong; extern fn uws_loop_addPostHandler(loop: *Loop, ctx: *anyopaque, cb: (fn (ctx: *anyopaque, loop: *Loop) callconv(.C) void)) void; extern fn uws_loop_removePostHandler(loop: *Loop, ctx: *anyopaque, cb: (fn (ctx: *anyopaque, loop: *Loop) callconv(.C) void)) void; + extern fn uws_loop_addPreHandler(loop: *Loop, ctx: *anyopaque, cb: (fn (ctx: *anyopaque, loop: *Loop) callconv(.C) void)) void; + extern fn uws_loop_removePreHandler(loop: *Loop, ctx: *anyopaque, cb: (fn (ctx: *anyopaque, loop: *Loop) callconv(.C) void)) void; }; const uintmax_t = c_ulong; @@ -372,9 +430,8 @@ pub const Poll = opaque { val: Data, fallthrough: bool, flags: Flags, - callback: CallbackType, ) ?*Poll { - var poll = us_create_callback(loop, @as(c_int, @boolToInt(fallthrough)), file, @sizeOf(Data)); + var poll = us_create_poll(loop, @as(c_int, @boolToInt(fallthrough)), @sizeOf(Data)); if (comptime Data != void) { poll.data(Data).* = val; } @@ -386,8 +443,7 @@ pub const Poll = opaque { if (flags.write) { flags_int |= Flags.write_flag; } - - us_callback_set(poll, flags_int, callback); + us_poll_init(poll, file, flags_int); return poll; } @@ -403,10 +459,17 @@ pub const Poll = opaque { return @intCast(@import("std").os.fd_t, us_poll_fd(self)); } - pub fn start(self: *Poll, poll_type: Flags) void { - // us_poll_start(self, loop: ?*Loop, events: c_int) - _ = self; - _ = poll_type; + pub fn start(self: *Poll, loop: *Loop, flags: Flags) void { + var flags_int: c_int = 0; + if (flags.read) { + flags_int |= Flags.read_flag; + } + + if (flags.write) { + flags_int |= Flags.write_flag; + } + + us_poll_start(self, loop, flags_int); } pub const Flags = struct { @@ -424,9 +487,9 @@ pub const Poll = opaque { } // (void* userData, int fd, int events, int error, struct us_poll_t *poll) - pub const CallbackType = fn (?*anyopaque, c_int, c_int, c_int, *Poll) void; - extern fn us_create_callback(loop: ?*Loop, fallthrough: c_int, fd: c_int, ext_size: c_uint) *Poll; - extern fn us_callback_set(poll: *Poll, events: c_int, callback: CallbackType) *Poll; + pub const CallbackType = fn (?*anyopaque, c_int, c_int, c_int, *Poll) callconv(.C) void; + extern fn us_create_poll(loop: ?*Loop, fallthrough: c_int, ext_size: c_uint) *Poll; + extern fn us_poll_set(poll: *Poll, events: c_int, callback: CallbackType) *Poll; extern fn us_poll_free(p: ?*Poll, loop: ?*Loop) void; extern fn us_poll_init(p: ?*Poll, fd: c_int, poll_type: c_int) void; extern fn us_poll_start(p: ?*Poll, loop: ?*Loop, events: c_int) void; diff --git a/src/feature_flags.zig b/src/feature_flags.zig index 692ee6c1d..c3a9fea10 100644 --- a/src/feature_flags.zig +++ b/src/feature_flags.zig @@ -70,6 +70,7 @@ pub const verbose_analytics = false; pub const disable_compression_in_http_client = false; +pub const enable_keepalive = true; // Not sure why... // But this is slower! // ~/Build/throw diff --git a/src/global.zig b/src/global.zig index 4db835be3..313a5a2ab 100644 --- a/src/global.zig +++ b/src/global.zig @@ -326,3 +326,5 @@ pub fn rand(bytes: []u8) void { const BoringSSL = @import("boringssl"); _ = BoringSSL.RAND_bytes(bytes.ptr, bytes.len); } + +pub const ObjectPool = @import("./pool.zig").ObjectPool; diff --git a/src/http/websocket_http_client.zig b/src/http/websocket_http_client.zig index 6c9cc6004..64ee3f778 100644 --- a/src/http/websocket_http_client.zig +++ b/src/http/websocket_http_client.zig @@ -134,7 +134,7 @@ pub fn NewHTTPUpgradeClient(comptime ssl: bool) type { pub fn register(global: *JSC.JSGlobalObject, loop_: *anyopaque, ctx_: *anyopaque) callconv(.C) void { var vm = global.bunVM(); - var loop = @ptrCast(*uws.Loop, loop_); + var loop = @ptrCast(*uws.Loop, @alignCast(@alignOf(uws.Loop), loop_)); var ctx: *uws.us_socket_context_t = @ptrCast(*uws.us_socket_context_t, ctx_); if (vm.uws_event_loop) |other| { @@ -762,7 +762,8 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { pub fn register(global: *JSC.JSGlobalObject, loop_: *anyopaque, ctx_: *anyopaque) callconv(.C) void { var vm = global.bunVM(); - var loop = @ptrCast(*uws.Loop, loop_); + var loop = @ptrCast(*uws.Loop, @alignCast(@alignOf(uws.Loop), loop_)); + var ctx: *uws.us_socket_context_t = @ptrCast(*uws.us_socket_context_t, ctx_); if (vm.uws_event_loop) |other| { @@ -1416,7 +1417,8 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { adopted.receive_buffer.ensureTotalCapacity(2048) catch return null; adopted.event_loop_ref = true; adopted.globalThis.bunVM().us_loop_reference_count +|= 1; - _ = globalThis.bunVM().eventLoop().ready_tasks_count.fetchAdd(1, .Monotonic); + globalThis.bunVM().active_tasks += 1; + var buffered_slice: []u8 = buffered_data[0..buffered_data_len]; if (buffered_slice.len > 0) { const InitialDataHandler = struct { @@ -1455,7 +1457,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { if (this.event_loop_ref) { this.event_loop_ref = false; this.globalThis.bunVM().us_loop_reference_count -|= 1; - _ = this.globalThis.bunVM().eventLoop().ready_tasks_count.fetchSub(1, .Monotonic); + this.globalThis.bunVM().active_tasks -|= 1; } this.outgoing_websocket = null; diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 869ff8724..321b0efb2 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -39,6 +39,11 @@ pub var http_thread: HTTPThread = undefined; const HiveArray = @import("./hive_array.zig").HiveArray; const Batch = NetworkThread.Batch; const TaggedPointerUnion = @import("./tagged_pointer.zig").TaggedPointerUnion; +const DeadSocket = opaque {}; +var dead_socket = @intToPtr(*DeadSocket, 1); + +const print_every = 0; +var print_every_i: usize = 0; fn NewHTTPContext(comptime ssl: bool) type { return struct { @@ -48,28 +53,24 @@ fn NewHTTPContext(comptime ssl: bool) type { hostname_buf: [MAX_KEEPALIVE_HOSTNAME]u8 = undefined, hostname_len: u8 = 0, port: u16 = 0, - - pub fn close(this: *PooledSocket) void { - this.* = undefined; - - if (comptime ssl) { - http_thread.https_context.keep_alive_sockets.unset(http_thread.https_context.pending_sockets.indexOf(this).?); - std.debug.assert(http_thread.https_context.pending_sockets.put(this)); - } else { - http_thread.http_context.keep_alive_sockets.unset(http_thread.http_context.pending_sockets.indexOf(this).?); - std.debug.assert(http_thread.http_context.pending_sockets.put(this)); - } - } }; pending_sockets: HiveArray(PooledSocket, pool_size) = HiveArray(PooledSocket, pool_size).init(), - keep_alive_sockets: std.bit_set.IntegerBitSet(pool_size + 1) = std.bit_set.IntegerBitSet(pool_size + 1).initEmpty(), us_socket_context: *uws.us_socket_context_t, const Context = @This(); pub const HTTPSocket = uws.NewSocketHandler(ssl); + pub fn context() *@This() { + if (comptime ssl) { + return &http_thread.https_context; + } else { + return &http_thread.http_context; + } + } + const ActiveSocket = TaggedPointerUnion(.{ + DeadSocket, HTTPClient, PooledSocket, }); @@ -80,7 +81,7 @@ fn NewHTTPContext(comptime ssl: bool) type { pub fn init(this: *@This()) !void { var opts: uws.us_socket_context_options_t = undefined; @memset(@ptrCast([*]u8, &opts), 0, @sizeOf(uws.us_socket_context_options_t)); - this.us_socket_context = uws.us_create_socket_context(ssl_int, uws.Loop.get(), @sizeOf(usize), opts).?; + this.us_socket_context = uws.us_create_socket_context(ssl_int, http_thread.loop, @sizeOf(usize), opts).?; HTTPSocket.configure( this.us_socket_context, @@ -97,22 +98,26 @@ fn NewHTTPContext(comptime ssl: bool) type { std.debug.assert(!socket.isShutdown()); std.debug.assert(socket.isEstablished()); } + std.debug.assert(hostname.len > 0); + std.debug.assert(port > 0); - if (hostname.len <= MAX_KEEPALIVE_HOSTNAME) { + if (hostname.len <= MAX_KEEPALIVE_HOSTNAME and !socket.isClosed() and !socket.isShutdown() and socket.isEstablished()) { if (this.pending_sockets.get()) |pending| { + socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(pending).ptr()); + socket.flush(); + socket.timeout(60); + pending.http_socket = socket; @memcpy(&pending.hostname_buf, hostname.ptr, hostname.len); pending.hostname_len = @truncate(u8, hostname.len); pending.port = port; - this.keep_alive_sockets.set( - this.pending_sockets.indexOf(pending).?, - ); - pending.http_socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(pending).ptr()); - log("Releasing socket for reuse {s}:{d}", .{ hostname, port }); + + log("- Keep-Alive release {s}:{d}", .{ hostname, port }); return; } } + socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(&dead_socket).ptr()); socket.close(0, null); } @@ -121,9 +126,20 @@ fn NewHTTPContext(comptime ssl: bool) type { ptr: *anyopaque, socket: HTTPSocket, ) void { - if (ActiveSocket.from(bun.cast(**anyopaque, ptr).*).get(HTTPClient)) |client| { + const active = ActiveSocket.from(bun.cast(**anyopaque, ptr).*); + if (active.get(HTTPClient)) |client| { return client.onOpen(comptime ssl, socket); } + + if (active.get(PooledSocket)) |pooled| { + std.debug.assert(context().pending_sockets.put(pooled)); + } + + socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(&dead_socket).ptr()); + socket.close(0, null); + if (comptime Environment.allow_assert) { + std.debug.assert(false); + } } pub fn onClose( ptr: *anyopaque, @@ -132,15 +148,17 @@ fn NewHTTPContext(comptime ssl: bool) type { _: ?*anyopaque, ) void { var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*); + socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(&dead_socket).ptr()); + if (tagged.get(HTTPClient)) |client| { return client.onClose(comptime ssl, socket); } - if (tagged.get(PooledSocket)) |client| { - return client.close(); + if (tagged.get(PooledSocket)) |pooled| { + std.debug.assert(context().pending_sockets.put(pooled)); } - unreachable; + return; } pub fn onData( ptr: *anyopaque, @@ -155,9 +173,9 @@ fn NewHTTPContext(comptime ssl: bool) type { if (comptime ssl) &http_thread.https_context else &http_thread.http_context, socket, ); + } else { + log("Unexpected data on socket", .{}); } - - unreachable; } pub fn onWritable( ptr: *anyopaque, @@ -171,24 +189,26 @@ fn NewHTTPContext(comptime ssl: bool) type { socket, ); } - - unreachable; } pub fn onTimeout( ptr: *anyopaque, socket: HTTPSocket, ) void { var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*); + socket.ext(**anyopaque).?.* = bun.cast( + **anyopaque, + ActiveSocket.init(&dead_socket).ptr(), + ); + if (tagged.get(HTTPClient)) |client| { return client.onTimeout( comptime ssl, socket, ); } else if (tagged.get(PooledSocket)) |pooled| { - pooled.close(); + std.debug.assert(context().pending_sockets.put(pooled)); + return; } - - unreachable; } pub fn onConnectError( ptr: *anyopaque, @@ -202,7 +222,8 @@ fn NewHTTPContext(comptime ssl: bool) type { socket, ); } else if (tagged.get(PooledSocket)) |pooled| { - pooled.close(); + std.debug.assert(context().pending_sockets.put(pooled)); + return; } unreachable; @@ -212,13 +233,17 @@ fn NewHTTPContext(comptime ssl: bool) type { socket: HTTPSocket, ) void { var tagged = ActiveSocket.from(bun.cast(**anyopaque, ptr).*); + socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(dead_socket).ptr()); + if (tagged.get(HTTPClient)) |client| { return client.onEnd( comptime ssl, socket, ); } else if (tagged.get(PooledSocket)) |pooled| { - pooled.close(); + std.debug.assert(context().pending_sockets.put(pooled)); + + return; } unreachable; @@ -229,22 +254,30 @@ fn NewHTTPContext(comptime ssl: bool) type { if (hostname.len > MAX_KEEPALIVE_HOSTNAME) return null; - var iter = this.keep_alive_sockets.iterator(.{ - .kind = .set, - }); - while (iter.next()) |index_i| { - const index = @truncate(u16, index_i); - var socket = this.pending_sockets.at(index); + var iter = this.pending_sockets.available.iterator(.{ .kind = .unset }); + + while (iter.next()) |pending_socket_index| { + var socket = this.pending_sockets.at(@intCast(u16, pending_socket_index)); if (socket.port != port) { continue; } - std.debug.assert(!this.pending_sockets.available.isSet(index)); - if (strings.eqlLong(socket.hostname_buf[0..socket.hostname_len], hostname, true)) { const http_socket = socket.http_socket; - socket.close(); - log("Keep-alive socket found for {s}:{d}.", .{ hostname, port }); + std.debug.assert(context().pending_sockets.put(socket)); + + if (http_socket.isClosed()) { + http_socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(&dead_socket).ptr()); + continue; + } + + if (http_socket.isShutdown()) { + http_socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(&dead_socket).ptr()); + http_socket.close(0, null); + continue; + } + + log("+ Keep-Alive reuse {s}:{d}", .{ hostname, port }); return http_socket; } } @@ -252,16 +285,22 @@ fn NewHTTPContext(comptime ssl: bool) type { return null; } - pub fn connect(this: *@This(), client: *HTTPClient, hostname_: []const u8, port: u16) !HTTPSocket { - const hostname = if (FeatureFlags.hardcode_localhost_to_127_0_0_1 and strings.eqlComptime(hostname_, "localhost")) - "127.0.0.1" - else - hostname_; - - if (this.existingSocket(hostname, port)) |sock| { - sock.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(client).ptr()); - client.onOpen(comptime ssl, sock); - return sock; + pub fn connect(this: *@This(), client: *HTTPClient, hostname: []const u8, port: u16) !HTTPSocket { + // const hostname = if (FeatureFlags.hardcode_localhost_to_127_0_0_1 and strings.eqlComptime(hostname_, "localhost")) + // "127.0.0.1" + // else + // hostname_; + + client.connected_url = client.url; + client.connected_url.hostname = hostname; + + if (comptime FeatureFlags.enable_keepalive) { + if (this.existingSocket(hostname, port)) |sock| { + sock.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(client).ptr()); + client.allow_retry = true; + client.onOpen(comptime ssl, sock); + return sock; + } } if (HTTPSocket.connectAnon( @@ -270,6 +309,7 @@ fn NewHTTPContext(comptime ssl: bool) type { this.us_socket_context, undefined, )) |socket| { + client.allow_retry = false; socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, ActiveSocket.init(client).ptr()); return socket; } @@ -279,6 +319,9 @@ fn NewHTTPContext(comptime ssl: bool) type { }; } +const UnboundedQueue = @import("./bun.js/unbounded_queue.zig").UnboundedQueue; +const Queue = UnboundedQueue(AsyncHTTP, .next); + pub const HTTPThread = struct { var http_thread_loaded: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false); @@ -286,9 +329,7 @@ pub const HTTPThread = struct { http_context: NewHTTPContext(false), https_context: NewHTTPContext(true), - queued_tasks_mutex: Lock = Lock.init(), - queued_tasks: Batch = .{}, - processing_tasks: Batch = .{}, + queued_tasks: Queue = Queue{}, has_awoken: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), timer: std.time.Timer = undefined, const threadlog = Output.scoped(.HTTPThread, true); @@ -319,8 +360,15 @@ pub const HTTPThread = struct { Output.Source.configureNamedThread("HTTP Client"); default_arena = Arena.init() catch unreachable; default_allocator = default_arena.allocator(); - var loop = uws.Loop.get().?; - _ = loop.addPostHandler(*HTTPThread, &http_thread, drainEvents); + var loop = uws.Loop.create(struct { + pub fn wakeup(_: *uws.Loop) callconv(.C) void { + http_thread.drainEvents(); + } + + pub fn pre(_: *uws.Loop) callconv(.C) void {} + pub fn post(_: *uws.Loop) callconv(.C) void {} + }); + http_thread.loop = loop; http_thread.http_context.init() catch @panic("Failed to init http context"); http_thread.https_context.init() catch @panic("Failed to init https context"); @@ -328,16 +376,6 @@ pub const HTTPThread = struct { http_thread.processEvents(); } - fn queueEvents(this: *@This()) void { - this.queued_tasks_mutex.lock(); - defer this.queued_tasks_mutex.unlock(); - if (this.queued_tasks.len == 0) - return; - threadlog("Received {d} tasks\n", .{this.queued_tasks.len}); - this.processing_tasks.push(this.queued_tasks); - this.queued_tasks = .{}; - } - pub fn connect(this: *@This(), client: *HTTPClient, comptime is_ssl: bool) !NewHTTPContext(is_ssl).HTTPSocket { return try this.context(is_ssl).connect(client, client.url.hostname, client.url.getPortAuto()); } @@ -347,25 +385,33 @@ pub const HTTPThread = struct { } fn drainEvents(this: *@This()) void { - this.queueEvents(); - var count: usize = 0; + var remaining: usize = AsyncHTTP.max_simultaneous_requests - AsyncHTTP.active_requests_count.loadUnchecked(); + if (remaining == 0) return; + defer { + if (comptime Environment.allow_assert) { + if (count > 0) + log("Processed {d} tasks\n", .{count}); + } + } - while (this.processing_tasks.pop()) |task| { - var callback = task.callback; - callback(task); + while (this.queued_tasks.pop()) |http| { + var cloned = default_allocator.create(AsyncHTTP) catch unreachable; + cloned.* = http.*; + cloned.real = http; + cloned.onStart(); if (comptime Environment.allow_assert) { count += 1; } - } - if (comptime Environment.allow_assert) { - if (count > 0) - log("Processed {d} tasks\n", .{count}); + remaining -= 1; + if (remaining == 0) break; } } fn processEvents_(this: *@This()) void { + this.loop.num_polls = @maximum(2, this.loop.num_polls); + while (true) { this.drainEvents(); @@ -392,9 +438,11 @@ pub const HTTPThread = struct { return; { - this.queued_tasks_mutex.lock(); - defer this.queued_tasks_mutex.unlock(); - this.queued_tasks.push(batch); + var batch_ = batch; + while (batch_.pop()) |task| { + var http: *AsyncHTTP = @fieldParentPtr(AsyncHTTP, "task", task); + this.queued_tasks.push(http); + } } if (this.has_awoken.load(.Monotonic)) @@ -439,6 +487,12 @@ pub fn onClose( _ = socket; log("Closed {s}\n", .{client.url.href}); + if (client.allow_retry) { + client.allow_retry = false; + client.start(client.state.request_body, client.state.body_out_str.?); + return; + } + if (client.state.stage != .done and client.state.stage != .fail) client.fail(error.ConnectionClosed); } @@ -554,12 +608,9 @@ pub const InternalState = struct { this.compressed_body = null; } - if (this.body_out_str) |body| { - body.reset(); - } - + var body_msg = this.body_out_str; this.* = .{ - .body_out_str = this.body_out_str, + .body_out_str = body_msg, }; } @@ -632,6 +683,7 @@ connected_url: URL = URL{}, allocator: std.mem.Allocator, verbose: bool = Environment.isTest, remaining_redirect_count: i8 = default_redirect_count, +allow_retry: bool = false, redirect: ?*URLBufferPool.Node = null, timeout: usize = 0, progress_node: ?*std.Progress.Node = null, @@ -664,13 +716,11 @@ pub fn init( }; } -pub fn deinit(this: *HTTPClient) !void { +pub fn deinit(this: *HTTPClient) void { if (this.redirect) |redirect| { redirect.release(); this.redirect = null; } - - this.state.reset(); } const Stage = enum(u8) { @@ -772,74 +822,20 @@ pub const HTTPChannelContext = struct { } }; -// This causes segfaults when resume connect() -pub const KeepAlive = struct { - const limit = 2; - pub const disabled = true; - fds: [limit]u32 = undefined, - hosts: [limit]u64 = undefined, - ports: [limit]u16 = undefined, - used: u8 = 0, - - pub var instance = KeepAlive{}; - - pub fn append(this: *KeepAlive, host: []const u8, port: u16, fd: os.socket_t) bool { - if (disabled) return false; - if (this.used >= limit or fd > std.math.maxInt(u32)) return false; - - const i = this.used; - const hash = std.hash.Wyhash.hash(0, host); - - this.fds[i] = @truncate(u32, @intCast(u64, fd)); - this.hosts[i] = hash; - this.ports[i] = port; - this.used += 1; - return true; - } - pub fn find(this: *KeepAlive, host: []const u8, port: u16) ?os.socket_t { - if (disabled) return null; - - if (this.used == 0) { - return null; - } - - const hash = std.hash.Wyhash.hash(0, host); - const list = this.hosts[0..this.used]; - for (list) |host_hash, i| { - if (host_hash == hash and this.ports[i] == port) { - const fd = this.fds[i]; - const last = this.used - 1; - - if (i > last) { - const end_host = this.hosts[last]; - const end_fd = this.fds[last]; - const end_port = this.ports[last]; - this.hosts[i] = end_host; - this.fds[i] = end_fd; - this.ports[i] = end_port; - } - this.used -= 1; - - return @intCast(os.socket_t, fd); - } - } - - return null; - } -}; - pub const AsyncHTTP = struct { request: ?picohttp.Request = null, response: ?picohttp.Response = null, request_headers: Headers.Entries = Headers.Entries{}, response_headers: Headers.Entries = Headers.Entries{}, response_buffer: *MutableString, - request_body: *MutableString, + request_body: []const u8 = "", allocator: std.mem.Allocator, request_header_buf: string = "", method: Method = Method.GET, max_retry_count: u32 = 0, url: URL, + real: ?*AsyncHTTP = null, + next: ?*AsyncHTTP = null, task: ThreadPool.Task = ThreadPool.Task{ .callback = startAsyncHTTP }, completion_callback: HTTPClientResult.Callback = undefined, @@ -859,8 +855,15 @@ pub const AsyncHTTP = struct { elapsed: u64 = 0, gzip_elapsed: u64 = 0, - pub var active_requests_count = std.atomic.Atomic(u32).init(0); - pub var max_simultaneous_requests: u16 = 32; + pub var active_requests_count = std.atomic.Atomic(usize).init(0); + pub var max_simultaneous_requests: usize = 256; + + pub fn deinit(this: *AsyncHTTP) void { + this.response_headers.deinit(this.allocator); + this.response_headers = .{}; + this.request = null; + this.response = null; + } pub const State = enum(u32) { pending = 0, @@ -878,7 +881,7 @@ pub const AsyncHTTP = struct { headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, - request_body: *MutableString, + request_body: []const u8, timeout: usize, callback: HTTPClientResult.Callback, ) AsyncHTTP { @@ -905,7 +908,7 @@ pub const AsyncHTTP = struct { headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, - request_body: *MutableString, + request_body: []const u8, timeout: usize, ) AsyncHTTP { return @This().init( @@ -929,7 +932,6 @@ pub const AsyncHTTP = struct { } pub fn schedule(this: *AsyncHTTP, _: std.mem.Allocator, batch: *ThreadPool.Batch) void { - HTTPThread.init() catch unreachable; this.state.store(.scheduled, .Monotonic); batch.push(ThreadPool.Batch.from(&this.task)); } @@ -953,7 +955,7 @@ pub const AsyncHTTP = struct { http_thread.schedule(batch); while (true) { const result: HTTPClientResult = ctx.channel.readItem() catch unreachable; - if (result.fail != error.NoError) { + if (!result.isSuccess()) { return result.fail; } @@ -964,23 +966,44 @@ pub const AsyncHTTP = struct { } pub fn onAsyncHTTPComplete(this: *AsyncHTTP, result: HTTPClientResult) void { + std.debug.assert(this.real != null); + const active_requests = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic); + std.debug.assert(active_requests > 0); + var completion = this.completion_callback; this.response = result.response; this.elapsed = http_thread.timer.read() -| this.elapsed; this.redirected = this.client.remaining_redirect_count != default_redirect_count; - if (result.fail != error.NoError) { + if (!result.isSuccess()) { this.err = result.fail; this.state.store(State.fail, .Monotonic); } else { this.err = null; this.state.store(.success, .Monotonic); } + this.client.deinit(); + + this.real.?.* = this.*; + this.real.?.response_buffer = this.response_buffer; + + log("onAsyncHTTPComplete: {any}", .{bun.fmt.fmtDuration(this.elapsed)}); + + default_allocator.destroy(this); completion.function(completion.ctx, result); + + if (active_requests == AsyncHTTP.max_simultaneous_requests) { + http_thread.drainEvents(); + } } pub fn startAsyncHTTP(task: *Task) void { var this = @fieldParentPtr(AsyncHTTP, "task", task); + this.onStart(); + } + + pub fn onStart(this: *AsyncHTTP) void { + _ = active_requests_count.fetchAdd(1, .Monotonic); this.err = null; this.state.store(.sending, .Monotonic); this.client.completion_callback = HTTPClientResult.Callback.New(*AsyncHTTP, onAsyncHTTPComplete).init( @@ -988,8 +1011,12 @@ pub const AsyncHTTP = struct { ); this.elapsed = http_thread.timer.read(); + if (this.response_buffer.list.capacity == 0) { + this.response_buffer.allocator = default_allocator; + } + this.client.start(this.request_body, this.response_buffer); - this.client.start(this.request_body.list.items, this.response_buffer); + log("onStart: {any}", .{bun.fmt.fmtDuration(this.elapsed)}); } }; @@ -1130,7 +1157,6 @@ pub fn start(this: *HTTPClient, body: []const u8, body_out_str: *MutableString) } fn start_(this: *HTTPClient, comptime is_ssl: bool) void { - this.connected_url = this.url; var socket = http_thread.connect(this, is_ssl) catch |err| { this.fail(err); return; @@ -1161,8 +1187,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s writer, request, ) catch { - this.fail(error.OutOfMemory); - socket.close(0, null); + this.closeAndFail(error.OutOfMemory, is_ssl, socket); return; }; @@ -1187,8 +1212,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s } if (amount < 0) { - this.fail(error.WriteFailed); - socket.close(0, null); + this.closeAndFail(error.WriteFailed, is_ssl, socket); return; } @@ -1219,8 +1243,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s const to_send = this.state.request_body; const amount = socket.write(to_send, true); if (amount < 0) { - this.fail(error.WriteFailed); - socket.close(0, null); + this.closeAndFail(error.WriteFailed, is_ssl, socket); return; } @@ -1236,6 +1259,15 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s } } +pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { + socket.ext(**anyopaque).?.* = bun.cast( + **anyopaque, + NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr(), + ); + this.fail(err); + socket.close(0, null); +} + pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void { switch (this.state.response_stage) { .pending, .headers => { @@ -1244,28 +1276,32 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u var amount_read: usize = 0; var needs_move = true; if (this.state.request_message) |req_msg| { - var available = req_msg.available(); + var available = req_msg.buf; if (available.len == 0) { this.state.request_message.?.release(); this.state.request_message = null; this.fail(error.ResponseHeadersTooLarge); + socket.shutdown(); socket.close(0, null); return; } + const wrote = @minimum(available.len - req_msg.used, incoming_data.len); @memcpy( - req_msg.available().ptr, + available.ptr + req_msg.used, incoming_data.ptr, - @minimum(available.len, incoming_data.len), + wrote, ); - req_msg.used += @truncate(u32, incoming_data.len); - amount_read = @truncate(u32, req_msg.sent); + req_msg.used += @truncate(u32, wrote); + amount_read = 0; req_msg.sent = 0; needs_move = false; - to_read = req_msg.slice(); - pending_buffers[1] = incoming_data[@minimum(available.len, incoming_data.len)..]; + to_read = available[0..req_msg.used]; + pending_buffers[1] = incoming_data[wrote..]; } + this.state.pending_response = picohttp.Response{}; + const response = picohttp.Response.parseParts( to_read, &this.response_headers_buf, @@ -1273,29 +1309,28 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u ) catch |err| { switch (err) { error.ShortRead => { - socket.timeout(60); if (needs_move) { std.debug.assert(this.state.request_message == null); this.state.request_message = AsyncMessage.get(default_allocator); if (to_read.len > this.state.request_message.?.buf.len) { - this.fail(error.ResponseHeadersTooLarge); - socket.close(0, null); + this.closeAndFail(error.ResponseHeadersTooLarge, is_ssl, socket); return; } _ = this.state.request_message.?.writeAll(incoming_data); - this.state.request_message.?.sent = @truncate(u32, to_read.len); - return; + this.state.request_message.?.sent = @truncate(u32, amount_read); } + + socket.timeout(60); }, else => { - socket.close(0, null); - this.fail(err); + this.closeAndFail(err, is_ssl, socket); return; }, } - unreachable; + return; }; + pending_buffers[0] = to_read[@minimum(@intCast(usize, response.bytes_read), to_read.len)..]; if (pending_buffers[0].len == 0 and pending_buffers[1].len > 0) { pending_buffers[0] = pending_buffers[1]; @@ -1318,9 +1353,13 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u this.state.request_message = null; } - if (this.state.allow_keepalive) { + if (this.state.allow_keepalive and FeatureFlags.enable_keepalive) { std.debug.assert(this.connected_url.hostname.len > 0); - ctx.releaseSocket(socket, this.connected_url.hostname, this.connected_url.getPortAuto()); + ctx.releaseSocket( + socket, + this.connected_url.hostname, + this.connected_url.getPortAuto(), + ); } else { socket.close(0, null); } @@ -1335,8 +1374,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u return; } - socket.close(0, null); - this.fail(err); + this.closeAndFail(err, is_ssl, socket); return; }; @@ -1352,8 +1390,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u if (this.state.response_stage == .body) { { const is_done = this.handleResponseBody(pending_buffers[0]) catch |err| { - socket.close(0, null); - this.fail(err); + this.closeAndFail(err, is_ssl, socket); return; }; @@ -1365,8 +1402,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u if (pending_buffers[1].len > 0) { const is_done = this.handleResponseBody(pending_buffers[1]) catch |err| { - socket.close(0, null); - this.fail(err); + this.closeAndFail(err, is_ssl, socket); return; }; @@ -1378,8 +1414,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u } else if (this.state.response_stage == .body_chunk) { { const is_done = this.handleResponseBodyChunk(pending_buffers[0]) catch |err| { - socket.close(0, null); - this.fail(err); + this.closeAndFail(err, is_ssl, socket); return; }; @@ -1391,8 +1426,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u if (pending_buffers[1].len > 0) { const is_done = this.handleResponseBodyChunk(pending_buffers[1]) catch |err| { - socket.close(0, null); - this.fail(err); + this.closeAndFail(err, is_ssl, socket); return; }; @@ -1410,8 +1444,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u socket.timeout(60); const is_done = this.handleResponseBody(incoming_data) catch |err| { - socket.close(0, null); - this.fail(err); + this.closeAndFail(err, is_ssl, socket); return; }; @@ -1425,8 +1458,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u socket.timeout(60); const is_done = this.handleResponseBodyChunk(incoming_data) catch |err| { - socket.close(0, null); - this.fail(err); + this.closeAndFail(err, is_ssl, socket); return; }; @@ -1439,8 +1471,8 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u .fail => {}, else => { - socket.close(0, null); - this.fail(error.UnexpectedData); + this.state.pending_response = .{}; + this.closeAndFail(error.UnexpectedData, is_ssl, socket); return; }, } @@ -1468,9 +1500,14 @@ pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ss this.state.request_stage = .done; this.state.stage = .done; - if (this.state.allow_keepalive and !socket.isClosed()) { - socket.timeout(60 * 5); - ctx.releaseSocket(socket, this.connected_url.hostname, this.connected_url.getPortAuto()); + socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr()); + + if (this.state.allow_keepalive and !socket.isClosed() and FeatureFlags.enable_keepalive) { + ctx.releaseSocket( + socket, + this.connected_url.hostname, + this.connected_url.getPortAuto(), + ); } else if (!socket.isClosed()) { socket.close(0, null); } @@ -1481,21 +1518,35 @@ pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ss this.state.response_stage = .done; this.state.request_stage = .done; this.state.stage = .done; - + if (comptime print_every > 0) { + print_every_i += 1; + if (print_every_i % print_every == 0) { + Output.prettyln("Heap stats for HTTP thread\n", .{}); + Output.flush(); + default_arena.dumpThreadStats(); + print_every_i = 0; + } + } callback.run(result); } pub const HTTPClientResult = struct { body: ?*MutableString = null, - response: picohttp.Response, + response: picohttp.Response = .{}, metadata_buf: []u8 = &.{}, href: []const u8 = "", fail: anyerror = error.NoError, + redirected: bool = false, headers_buf: []picohttp.Header = &.{}, + pub fn isSuccess(this: *const HTTPClientResult) bool { + return this.fail == error.NoError; + } + pub fn deinitMetadata(this: *HTTPClientResult) void { if (this.metadata_buf.len > 0) bun.default_allocator.free(this.metadata_buf); if (this.headers_buf.len > 0) bun.default_allocator.free(this.headers_buf); + this.headers_buf = &.{}; this.metadata_buf = &.{}; this.href = ""; @@ -1545,15 +1596,30 @@ pub fn toResult(this: *HTTPClient) HTTPClientResult { .body = this.state.body_out_str, .response = response, .metadata_buf = builder.ptr.?[0..builder.cap], + .redirected = this.remaining_redirect_count != default_redirect_count, .href = href, .fail = this.state.fail, .headers_buf = headers_buf, }; } +// preallocate a buffer for the body no more than 256 MB +// the intent is to avoid an OOM caused by a malicious server +// reporting gigantic Conten-Length and then +// never finishing sending the body +const preallocate_max = 1024 * 1024 * 256; + pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8) !bool { var buffer = this.state.getBodyBuffer(); + if (buffer.list.items.len == 0 and + this.state.body_size > 0 and this.state.body_size < preallocate_max) + { + // since we don't do streaming yet, we might as well just allocate the whole thing + // when we know the expected size + buffer.list.ensureTotalCapacityPrecise(buffer.allocator, this.state.body_size) catch {}; + } + const remaining_content_length = this.state.body_size - buffer.list.items.len; var remainder = incoming_data[0..@minimum(incoming_data.len, remaining_content_length)]; @@ -1683,7 +1749,7 @@ pub fn handleResponseMetadata( location = header.value; }, hashHeaderName("Connection") => { - if (response.status_code >= 200 and response.status_code <= 299 and !KeepAlive.disabled) { + if (response.status_code >= 200 and response.status_code <= 299) { if (!strings.eqlComptime(header.value, "keep-alive")) { this.state.allow_keepalive = false; } diff --git a/src/install/install.zig b/src/install/install.zig index f0929cd3d..93b3def58 100644 --- a/src/install/install.zig +++ b/src/install/install.zig @@ -317,7 +317,6 @@ const NetworkTask = struct { header_builder.content = GlobalStringBuilder{ .ptr = @intToPtr([*]u8, @ptrToInt(std.mem.span(default_headers_buf).ptr)), .len = default_headers_buf.len, .cap = default_headers_buf.len }; } - this.request_buffer = try MutableString.init(allocator, 0); this.response_buffer = try MutableString.init(allocator, 0); this.allocator = allocator; this.http = AsyncHTTP.init( @@ -327,7 +326,7 @@ const NetworkTask = struct { header_builder.entries, header_builder.content.ptr.?[0..header_builder.content.len], &this.response_buffer, - &this.request_buffer, + "", 0, this.getCompletionCallback(), ); @@ -376,7 +375,6 @@ const NetworkTask = struct { this.url_buf = tarball.url; } - this.request_buffer = try MutableString.init(allocator, 0); this.response_buffer = try MutableString.init(allocator, 0); this.allocator = allocator; @@ -410,7 +408,7 @@ const NetworkTask = struct { header_builder.entries, header_buf, &this.response_buffer, - &this.request_buffer, + "", 0, this.getCompletionCallback(), ); diff --git a/src/mimalloc_arena.zig b/src/mimalloc_arena.zig index c8117b6f5..0f54cdea6 100644 --- a/src/mimalloc_arena.zig +++ b/src/mimalloc_arena.zig @@ -31,9 +31,14 @@ pub const Arena = struct { pub fn deinit(this: *Arena) void { mimalloc.mi_heap_destroy(this.heap); + this.heap = null; } + pub fn dumpThreadStats(_: *Arena) void { + mimalloc.mi_thread_stats_print_out(null, null); + } + pub fn reset(this: *Arena) void { this.deinit(); this.* = init() catch unreachable; diff --git a/src/napi/napi.zig b/src/napi/napi.zig index 84a1008e9..54e16d153 100644 --- a/src/napi/napi.zig +++ b/src/napi/napi.zig @@ -10,14 +10,39 @@ const TODO_EXCEPTION: JSC.C.ExceptionRef = null; const Channel = @import("../sync.zig").Channel; pub const napi_env = *JSC.JSGlobalObject; -pub const napi_ref = struct_napi_ref__; +pub const Ref = opaque { + pub fn create(globalThis: *JSC.JSGlobalObject, value: JSValue) *Ref { + var ref: *Ref = undefined; + std.debug.assert( + napi_create_reference( + globalThis, + value, + 1, + &ref, + ) == .ok, + ); + if (comptime bun.Environment.isDebug) { + std.debug.assert(ref.get(globalThis) == value); + } + return ref; + } + + pub fn get(ref: *Ref, globalThis: *JSC.JSGlobalObject) JSValue { + var value: JSValue = JSValue.zero; + std.debug.assert(napi_get_reference_value(globalThis, ref, &value) == .ok); + return value; + } + + pub fn destroy(ref: *Ref, globalThis: *JSC.JSGlobalObject) void { + std.debug.assert(napi_delete_reference(globalThis, ref) == .ok); + } +}; pub const napi_handle_scope = napi_env; pub const napi_escapable_handle_scope = struct_napi_escapable_handle_scope__; pub const napi_callback_info = *JSC.CallFrame; pub const napi_deferred = *JSC.JSPromise; pub const napi_value = JSC.JSValue; -pub const struct_napi_ref__ = opaque {}; pub const struct_napi_escapable_handle_scope__ = opaque {}; pub const struct_napi_deferred__ = opaque {}; @@ -629,16 +654,16 @@ pub extern fn napi_define_class( properties: [*c]const napi_property_descriptor, result: *napi_value, ) napi_status; -pub extern fn napi_wrap(env: napi_env, js_object: napi_value, native_object: ?*anyopaque, finalize_cb: napi_finalize, finalize_hint: ?*anyopaque, result: [*c]napi_ref) napi_status; +pub extern fn napi_wrap(env: napi_env, js_object: napi_value, native_object: ?*anyopaque, finalize_cb: napi_finalize, finalize_hint: ?*anyopaque, result: [*c]Ref) napi_status; pub extern fn napi_unwrap(env: napi_env, js_object: napi_value, result: [*]*anyopaque) napi_status; pub extern fn napi_remove_wrap(env: napi_env, js_object: napi_value, result: [*]*anyopaque) napi_status; pub extern fn napi_create_external(env: napi_env, data: ?*anyopaque, finalize_cb: napi_finalize, finalize_hint: ?*anyopaque, result: *napi_value) napi_status; pub extern fn napi_get_value_external(env: napi_env, value: napi_value, result: [*]*anyopaque) napi_status; -pub extern fn napi_create_reference(env: napi_env, value: napi_value, initial_refcount: u32, result: [*c]napi_ref) napi_status; -pub extern fn napi_delete_reference(env: napi_env, ref: napi_ref) napi_status; -pub extern fn napi_reference_ref(env: napi_env, ref: napi_ref, result: [*c]u32) napi_status; -pub extern fn napi_reference_unref(env: napi_env, ref: napi_ref, result: [*c]u32) napi_status; -pub extern fn napi_get_reference_value(env: napi_env, ref: napi_ref, result: *napi_value) napi_status; +pub extern fn napi_create_reference(env: napi_env, value: napi_value, initial_refcount: u32, result: **Ref) napi_status; +pub extern fn napi_delete_reference(env: napi_env, ref: *Ref) napi_status; +pub extern fn napi_reference_ref(env: napi_env, ref: *Ref, result: [*c]u32) napi_status; +pub extern fn napi_reference_unref(env: napi_env, ref: *Ref, result: [*c]u32) napi_status; +pub extern fn napi_get_reference_value(env: napi_env, ref: *Ref, result: *napi_value) napi_status; // JSC scans the stack // we don't need this @@ -818,7 +843,7 @@ pub export fn napi_get_date_value(env: napi_env, value: napi_value, result: *f64 ).asNumber(); return .ok; } -pub extern fn napi_add_finalizer(env: napi_env, js_object: napi_value, native_object: ?*anyopaque, finalize_cb: napi_finalize, finalize_hint: ?*anyopaque, result: *napi_ref) napi_status; +pub extern fn napi_add_finalizer(env: napi_env, js_object: napi_value, native_object: ?*anyopaque, finalize_cb: napi_finalize, finalize_hint: ?*anyopaque, result: *Ref) napi_status; pub export fn napi_create_bigint_int64(env: napi_env, value: i64, result: *napi_value) napi_status { result.* = JSC.JSValue.fromInt64NoTruncate(env, value); return .ok; @@ -853,6 +878,7 @@ const WorkPoolTask = @import("../work_pool.zig").Task; /// must be globally allocated pub const napi_async_work = struct { task: WorkPoolTask = .{ .callback = runFromThreadPool }, + concurrent_task: JSC.ConcurrentTask = .{}, completion_task: ?*anyopaque = null, event_loop: *JSC.EventLoop, global: napi_env, @@ -900,7 +926,7 @@ pub const napi_async_work = struct { this.execute.?(this.global, this.ctx); this.status.store(@enumToInt(Status.completed), .SeqCst); - this.event_loop.enqueueTaskConcurrent(JSC.Task.init(this)); + this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this)); } pub fn schedule(this: *napi_async_work) void { @@ -1139,6 +1165,8 @@ pub const ThreadSafeFunction = struct { owning_threads: std.AutoArrayHashMapUnmanaged(u64, void) = .{}, owning_thread_lock: Lock = Lock.init(), event_loop: *JSC.EventLoop, + concurrent_task: JSC.ConcurrentTask = .{}, + concurrent_finalizer_task: JSC.ConcurrentTask = .{}, javascript_function: JSValue, finalizer_task: JSC.AnyTask = undefined, @@ -1243,7 +1271,7 @@ pub const ThreadSafeFunction = struct { } } - this.event_loop.enqueueTaskConcurrent(JSC.Task.init(this)); + this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this)); } pub fn finalize(opaq: *anyopaque) void { @@ -1284,7 +1312,7 @@ pub const ThreadSafeFunction = struct { if (this.owning_threads.count() == 0) { this.finalizer_task = JSC.AnyTask{ .ctx = this, .callback = finalize }; - this.event_loop.enqueueTaskConcurrent(JSC.Task.init(&this.finalizer_task)); + this.event_loop.enqueueTaskConcurrent(this.concurrent_finalizer_task.from(&this.finalizer_task)); return; } } diff --git a/src/pool.zig b/src/pool.zig index 7894a5758..344e9ca69 100644 --- a/src/pool.zig +++ b/src/pool.zig @@ -179,6 +179,10 @@ pub fn ObjectPool( return node; } + pub fn first(allocator: std.mem.Allocator) *Type { + return &get(allocator).data; + } + pub fn get(allocator: std.mem.Allocator) *LinkedList.Node { if (data().loaded) { if (data().list.popFirst()) |node| { @@ -204,6 +208,10 @@ pub fn ObjectPool( return new_node; } + pub fn releaseValue(value: *Type) void { + @fieldParentPtr(LinkedList.Node, "data", value).release(); + } + pub fn release(node: *LinkedList.Node) void { if (comptime max_count > 0) { if (data().count >= max_count) { diff --git a/src/thread_pool.zig b/src/thread_pool.zig index 8839d2090..eac9a2055 100644 --- a/src/thread_pool.zig +++ b/src/thread_pool.zig @@ -93,6 +93,8 @@ pub const Batch = struct { if (task.node.next) |node| { this.head = @fieldParentPtr(Task, "node", node); } else { + if (task != this.tail.?) unreachable; + this.tail = null; this.head = null; } diff --git a/test/bun.js/buffer.test.js b/test/bun.js/buffer.test.js index 47cdbe210..c3f422024 100644 --- a/test/bun.js/buffer.test.js +++ b/test/bun.js/buffer.test.js @@ -222,7 +222,7 @@ it("Buffer.copy", () => { { // Create two `Buffer` instances. const buf1 = Buffer.allocUnsafe(26); - const buf2 = Buffer.allocUnsafe(26).fill('!'); + const buf2 = Buffer.allocUnsafe(26).fill("!"); for (let i = 0; i < 26; i++) { // 97 is the decimal ASCII value for 'a'. @@ -231,7 +231,7 @@ it("Buffer.copy", () => { // Copy `buf1` bytes 16 through 19 into `buf2` starting at byte 8 of `buf2`. buf1.copy(buf2, 8, 16, 20); - expect(buf2.toString('ascii', 0, 25)).toBe('!!!!!!!!qrst!!!!!!!!!!!!!'); + expect(buf2.toString("ascii", 0, 25)).toBe("!!!!!!!!qrst!!!!!!!!!!!!!"); } { @@ -243,7 +243,7 @@ it("Buffer.copy", () => { } buf.copy(buf, 0, 4, 10); - expect(buf.toString()).toBe('efghijghijklmnopqrstuvwxyz'); + expect(buf.toString()).toBe("efghijghijklmnopqrstuvwxyz"); } }); @@ -427,33 +427,33 @@ it("write", () => { }); it("includes", () => { - const buf = Buffer.from('this is a buffer'); + const buf = Buffer.from("this is a buffer"); - expect(buf.includes('this')).toBe(true); - expect(buf.includes('is')).toBe(true); - expect(buf.includes(Buffer.from('a buffer'))).toBe(true); + expect(buf.includes("this")).toBe(true); + expect(buf.includes("is")).toBe(true); + expect(buf.includes(Buffer.from("a buffer"))).toBe(true); expect(buf.includes(97)).toBe(true); - expect(buf.includes(Buffer.from('a buffer example'))).toBe(false); - expect(buf.includes(Buffer.from('a buffer example').slice(0, 8))).toBe(true); - expect(buf.includes('this', 4)).toBe(false); + expect(buf.includes(Buffer.from("a buffer example"))).toBe(false); + expect(buf.includes(Buffer.from("a buffer example").slice(0, 8))).toBe(true); + expect(buf.includes("this", 4)).toBe(false); }); it("indexOf", () => { - const buf = Buffer.from('this is a buffer'); + const buf = Buffer.from("this is a buffer"); - expect(buf.indexOf('this')).toBe(0); - expect(buf.indexOf('is')).toBe(2); - expect(buf.indexOf(Buffer.from('a buffer'))).toBe(8); + expect(buf.indexOf("this")).toBe(0); + expect(buf.indexOf("is")).toBe(2); + expect(buf.indexOf(Buffer.from("a buffer"))).toBe(8); expect(buf.indexOf(97)).toBe(8); - expect(buf.indexOf(Buffer.from('a buffer example'))).toBe(-1); - expect(buf.indexOf(Buffer.from('a buffer example').slice(0, 8))).toBe(8); + expect(buf.indexOf(Buffer.from("a buffer example"))).toBe(-1); + expect(buf.indexOf(Buffer.from("a buffer example").slice(0, 8))).toBe(8); - const utf16Buffer = Buffer.from('\u039a\u0391\u03a3\u03a3\u0395', 'utf16le'); + const utf16Buffer = Buffer.from("\u039a\u0391\u03a3\u03a3\u0395", "utf16le"); - expect(utf16Buffer.indexOf('\u03a3', 0, 'utf16le')).toBe(4); - expect(utf16Buffer.indexOf('\u03a3', -4, 'utf16le')).toBe(6); + expect(utf16Buffer.indexOf("\u03a3", 0, "utf16le")).toBe(4); + expect(utf16Buffer.indexOf("\u03a3", -4, "utf16le")).toBe(6); - const b = Buffer.from('abcdef'); + const b = Buffer.from("abcdef"); // Passing a value that's a number, but not a valid byte. // Prints: 2, equivalent to searching for 99 or 'c'. @@ -462,31 +462,31 @@ it("indexOf", () => { // Passing a byteOffset that coerces to NaN or 0. // Prints: 1, searching the whole buffer. - expect(b.indexOf('b', undefined)).toBe(1); - expect(b.indexOf('b', {})).toBe(1); - expect(b.indexOf('b', null)).toBe(1); - expect(b.indexOf('b', [])).toBe(1); + expect(b.indexOf("b", undefined)).toBe(1); + expect(b.indexOf("b", {})).toBe(1); + expect(b.indexOf("b", null)).toBe(1); + expect(b.indexOf("b", [])).toBe(1); }); it("lastIndexOf", () => { - const buf = Buffer.from('this buffer is a buffer'); + const buf = Buffer.from("this buffer is a buffer"); - expect(buf.lastIndexOf('this')).toBe(0); - expect(buf.lastIndexOf('this', 0)).toBe(0); - expect(buf.lastIndexOf('this', -1000)).toBe(-1); - expect(buf.lastIndexOf('buffer')).toBe(17); - expect(buf.lastIndexOf(Buffer.from('buffer'))).toBe(17); + expect(buf.lastIndexOf("this")).toBe(0); + expect(buf.lastIndexOf("this", 0)).toBe(0); + expect(buf.lastIndexOf("this", -1000)).toBe(-1); + expect(buf.lastIndexOf("buffer")).toBe(17); + expect(buf.lastIndexOf(Buffer.from("buffer"))).toBe(17); expect(buf.lastIndexOf(97)).toBe(15); - expect(buf.lastIndexOf(Buffer.from('yolo'))).toBe(-1); - expect(buf.lastIndexOf('buffer', 5)).toBe(5); - expect(buf.lastIndexOf('buffer', 4)).toBe(-1); + expect(buf.lastIndexOf(Buffer.from("yolo"))).toBe(-1); + expect(buf.lastIndexOf("buffer", 5)).toBe(5); + expect(buf.lastIndexOf("buffer", 4)).toBe(-1); - const utf16Buffer = Buffer.from('\u039a\u0391\u03a3\u03a3\u0395', 'utf16le'); + const utf16Buffer = Buffer.from("\u039a\u0391\u03a3\u03a3\u0395", "utf16le"); - expect(utf16Buffer.lastIndexOf('\u03a3', undefined, 'utf16le')).toBe(6); - expect(utf16Buffer.lastIndexOf('\u03a3', -5, 'utf16le')).toBe(4); + expect(utf16Buffer.lastIndexOf("\u03a3", undefined, "utf16le")).toBe(6); + expect(utf16Buffer.lastIndexOf("\u03a3", -5, "utf16le")).toBe(4); - const b = Buffer.from('abcdef'); + const b = Buffer.from("abcdef"); // Passing a value that's a number, but not a valid byte. // Prints: 2, equivalent to searching for 99 or 'c'. @@ -495,11 +495,11 @@ it("lastIndexOf", () => { // Passing a byteOffset that coerces to NaN or 0. // Prints: 1, searching the whole buffer. - expect(b.lastIndexOf('b', undefined)).toBe(1); - expect(b.lastIndexOf('b', {})).toBe(1); + expect(b.lastIndexOf("b", undefined)).toBe(1); + expect(b.lastIndexOf("b", {})).toBe(1); // Passing a byteOffset that coerces to 0. // Prints: -1, equivalent to passing 0. - expect(b.lastIndexOf('b', null)).toBe(-1); - expect(b.lastIndexOf('b', [])).toBe(-1); + expect(b.lastIndexOf("b", null)).toBe(-1); + expect(b.lastIndexOf("b", [])).toBe(-1); }); diff --git a/test/bun.js/fetch.test.js b/test/bun.js/fetch.test.js index 9b6093afd..a4ab0bfa6 100644 --- a/test/bun.js/fetch.test.js +++ b/test/bun.js/fetch.test.js @@ -255,6 +255,50 @@ describe("Blob", () => { } }); +{ + const sample = new TextEncoder().encode("Hello World!"); + const typedArrays = [ + Uint8Array, + Uint8ClampedArray, + Int8Array, + Uint16Array, + Int16Array, + Uint32Array, + Int32Array, + Float32Array, + Float64Array, + ]; + const Constructors = [Blob, Response, Request]; + + for (let withGC of [false, true]) { + for (let TypedArray of typedArrays) { + for (let Constructor of Constructors) { + it(`${Constructor.name} arrayBuffer() with ${TypedArray.name}${ + withGC ? " with gc" : "" + }`, async () => { + const data = new TypedArray(sample); + if (withGC) gc(); + const input = + Constructor === Blob + ? [data] + : Constructor === Request + ? { body: data } + : data; + if (withGC) gc(); + const blob = new Constructor(input); + if (withGC) gc(); + const out = await blob.arrayBuffer(); + if (withGC) gc(); + expect(out instanceof ArrayBuffer).toBe(true); + if (withGC) gc(); + expect(out.byteLength).toBe(data.byteLength); + if (withGC) gc(); + }); + } + } + } +} + describe("Response", () => { describe("Response.json", () => { it("works", async () => { diff --git a/test/bun.js/serve.test.ts b/test/bun.js/serve.test.ts index 99b667a2d..ba505feba 100644 --- a/test/bun.js/serve.test.ts +++ b/test/bun.js/serve.test.ts @@ -158,7 +158,6 @@ describe("streaming", () => { }); const response = await fetch(`http://localhost:${server.port}`); - console.log("here"); expect(response.status).toBe(500); } catch (e) { if (!e || !(e instanceof TestPass)) { diff --git a/test/bun.js/transpiler.test.js b/test/bun.js/transpiler.test.js index 3bb458697..3d3bcc109 100644 --- a/test/bun.js/transpiler.test.js +++ b/test/bun.js/transpiler.test.js @@ -141,22 +141,22 @@ describe("Bun.Transpiler", () => { x[x["y"] = 0] = "y"; })(x || (x = {})); })(second = first.second || (first.second = {})); -})(first || (first = {}))` +})(first || (first = {}))`; it("exported inner namespace", () => { ts.expectPrinted_(input3, output3); }); - const input4 = `export enum x { y }` + const input4 = `export enum x { y }`; const output4 = `export var x; (function(x) { x[x["y"] = 0] = "y"; -})(x || (x = {}))` +})(x || (x = {}))`; it("exported enum", () => { ts.expectPrinted_(input4, output4); }); - }) + }); describe("exports.replace", () => { const transpiler = new Bun.Transpiler({ |