aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js')
-rw-r--r--src/bun.js/api/bun.zig34
-rw-r--r--src/bun.js/api/server.zig50
-rw-r--r--src/bun.js/base.zig41
-rw-r--r--src/bun.js/bindings/JSBuffer.cpp7
-rw-r--r--src/bun.js/event_loop.zig254
-rw-r--r--src/bun.js/javascript.zig19
-rw-r--r--src/bun.js/unbounded_queue.zig149
-rw-r--r--src/bun.js/webcore/response.zig280
-rw-r--r--src/bun.js/webcore/streams.zig3
9 files changed, 470 insertions, 367 deletions
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig
index a37d5d62c..3fafdc177 100644
--- a/src/bun.js/api/bun.zig
+++ b/src/bun.js/api/bun.zig
@@ -1101,6 +1101,9 @@ pub const Class = NewClass(
.nanoseconds = .{
.rfn = nanoseconds,
},
+ .DO_NOT_USE_OR_YOU_WILL_BE_FIRED_mimalloc_dump = .{
+ .rfn = dump_mimalloc,
+ },
.gzipSync = .{
.rfn = JSC.wrapWithHasContainer(JSZlib, "gzipSync", false, false, true),
},
@@ -1191,6 +1194,18 @@ pub const Class = NewClass(
},
);
+fn dump_mimalloc(
+ _: void,
+ globalThis: JSC.C.JSContextRef,
+ _: JSC.C.JSObjectRef,
+ _: JSC.C.JSObjectRef,
+ _: []const JSC.C.JSValueRef,
+ _: JSC.C.ExceptionRef,
+) JSC.C.JSValueRef {
+ globalThis.bunVM().arena.dumpThreadStats();
+ return JSC.JSValue.jsUndefined().asObjectRef();
+}
+
pub const Crypto = struct {
const Hashers = @import("../../sha.zig");
@@ -2101,6 +2116,8 @@ pub const Timer = struct {
return VirtualMachine.vm.timer.last_id;
}
+ const Pool = bun.ObjectPool(Timeout, null, true, 1000);
+
pub const Timeout = struct {
id: i32 = 0,
callback: JSValue,
@@ -2134,11 +2151,13 @@ pub const Timer = struct {
if (comptime JSC.is_bindgen)
unreachable;
+ var vm = global.bunVM();
+
if (!this.cancelled) {
if (this.repeat) {
this.io_task.?.deinit();
- var task = Timeout.TimeoutTask.createOnJSThread(VirtualMachine.vm.allocator, global, this) catch unreachable;
- VirtualMachine.vm.timer.timeouts.put(VirtualMachine.vm.allocator, this.id, this) catch unreachable;
+ var task = Timeout.TimeoutTask.createOnJSThread(vm.allocator, global, this) catch unreachable;
+ vm.timer.timeouts.put(vm.allocator, this.id, this) catch unreachable;
this.io_task = task;
task.schedule();
}
@@ -2148,12 +2167,12 @@ pub const Timer = struct {
if (this.repeat)
return;
- VirtualMachine.vm.timer.active -|= 1;
- VirtualMachine.vm.active_tasks -|= 1;
+ vm.timer.active -|= 1;
+ vm.active_tasks -|= 1;
} else {
// the active tasks count is already cleared for canceled timeout,
// add one here to neutralize the `-|= 1` in event loop.
- VirtualMachine.vm.active_tasks +|= 1;
+ vm.active_tasks +|= 1;
}
this.clear(global);
@@ -2168,8 +2187,9 @@ pub const Timer = struct {
_ = VirtualMachine.vm.timer.timeouts.swapRemove(this.id);
if (this.io_task) |task| {
task.deinit();
+ this.io_task = null;
}
- VirtualMachine.vm.allocator.destroy(this);
+ Pool.releaseValue(this);
}
};
@@ -2181,7 +2201,7 @@ pub const Timer = struct {
repeat: bool,
) !void {
if (comptime is_bindgen) unreachable;
- var timeout = try VirtualMachine.vm.allocator.create(Timeout);
+ var timeout = Pool.first(globalThis.bunVM().allocator);
js.JSValueProtect(globalThis.ref(), callback.asObjectRef());
timeout.* = Timeout{ .id = id, .callback = callback, .interval = countdown.toInt32(), .repeat = repeat };
var task = try Timeout.TimeoutTask.createOnJSThread(VirtualMachine.vm.allocator, globalThis, timeout);
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index b79e6c7ab..55b829caa 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -1850,6 +1850,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
has_js_deinited: bool = false,
listen_callback: JSC.AnyTask = undefined,
allocator: std.mem.Allocator,
+ keeping_js_alive: bool = false,
pub const Class = JSC.NewClass(
ThisServer,
@@ -1917,15 +1918,17 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
}
pub fn deinitIfWeCan(this: *ThisServer) void {
- if (this.pending_requests == 0 and this.listener == null and this.has_js_deinited)
+ if (this.pending_requests == 0 and this.listener == null and this.has_js_deinited) {
+ this.deref();
this.deinit();
+ }
}
pub fn stop(this: *ThisServer) void {
if (this.listener) |listener| {
- listener.close();
this.listener = null;
- this.vm.disable_run_us_loop = false;
+ this.deref();
+ listener.close();
}
this.deinitIfWeCan();
@@ -2038,22 +2041,26 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
this.listener = socket;
const needs_post_handler = this.vm.uws_event_loop == null;
this.vm.uws_event_loop = uws.Loop.get();
- this.listen_callback = JSC.AnyTask.New(ThisServer, run).init(this);
- this.vm.eventLoop().enqueueTask(JSC.Task.init(&this.listen_callback));
+ this.ref();
+
if (needs_post_handler) {
_ = this.vm.uws_event_loop.?.addPostHandler(*JSC.EventLoop, this.vm.eventLoop(), JSC.EventLoop.tick);
+ _ = this.vm.uws_event_loop.?.addPreHandler(*JSC.EventLoop, this.vm.eventLoop(), JSC.EventLoop.tick);
}
}
- pub fn run(this: *ThisServer) void {
- // this.app.addServerName(hostname_pattern: [*:0]const u8)
+ pub fn ref(this: *ThisServer) void {
+ if (this.keeping_js_alive) return;
+
+ this.vm.us_loop_reference_count +|= 1;
+ this.keeping_js_alive = true;
+ }
- // we do not increment the reference count here
- // uWS manages running the loop, so it is unnecessary
- // this.vm.us_loop_reference_count +|= 1;
- this.vm.disable_run_us_loop = true;
+ pub fn deref(this: *ThisServer) void {
+ if (!this.keeping_js_alive) return;
- this.app.run();
+ this.vm.us_loop_reference_count -|= 1;
+ this.keeping_js_alive = false;
}
pub fn onBunInfoRequest(this: *ThisServer, req: *uws.Request, resp: *App.Response) void {
@@ -2286,11 +2293,20 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
this.app.get("/src:/*", *ThisServer, this, onSrcRequest);
}
- this.app.listenWithConfig(*ThisServer, this, onListen, .{
- .port = this.config.port,
- .host = this.config.hostname,
- .options = 0,
- });
+ const hostname = bun.span(this.config.hostname);
+
+ if (!(hostname.len == 0 or strings.eqlComptime(hostname, "0.0.0.0"))) {
+ this.app.listenWithConfig(*ThisServer, this, onListen, .{
+ .port = this.config.port,
+ .options = 0,
+ });
+ } else {
+ this.app.listenWithConfig(*ThisServer, this, onListen, .{
+ .port = this.config.port,
+ .host = this.config.hostname,
+ .options = 0,
+ });
+ }
}
};
}
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig
index 2ded4e5cb..6a4cc7469 100644
--- a/src/bun.js/base.zig
+++ b/src/bun.js/base.zig
@@ -2770,7 +2770,6 @@ pub fn castObj(obj: js.JSObjectRef, comptime Type: type) *Type {
const JSNode = @import("../js_ast.zig").Macro.JSNode;
const LazyPropertiesObject = @import("../js_ast.zig").Macro.LazyPropertiesObject;
const ModuleNamespace = @import("../js_ast.zig").Macro.ModuleNamespace;
-const FetchTaskletContext = Fetch.FetchTasklet.FetchTaskletContext;
const Expect = Test.Expect;
const DescribeScope = Test.DescribeScope;
const TestScope = Test.TestScope;
@@ -2824,7 +2823,6 @@ pub const JSPrivateDataPtr = TaggedPointerUnion(.{
Expect,
ExpectPrototype,
FetchEvent,
- FetchTaskletContext,
HTMLRewriter,
JSNode,
LazyPropertiesObject,
@@ -3497,25 +3495,26 @@ pub fn wrapWithHasContainer(
}
if (comptime maybe_async) {
- var vm = ctx.ptr().bunVM();
- vm.tick();
-
- var promise = JSC.JSInternalPromise.resolvedPromise(ctx.ptr(), result);
-
- switch (promise.status(ctx.ptr().vm())) {
- JSC.JSPromise.Status.Pending => {
- while (promise.status(ctx.ptr().vm()) == .Pending) {
- vm.tick();
- }
- result = promise.result(ctx.ptr().vm());
- },
- JSC.JSPromise.Status.Rejected => {
- result = promise.result(ctx.ptr().vm());
- exception.* = result.asObjectRef();
- },
- JSC.JSPromise.Status.Fulfilled => {
- result = promise.result(ctx.ptr().vm());
- },
+ if (result.asPromise() != null or result.asInternalPromise() != null) {
+ var vm = ctx.ptr().bunVM();
+ vm.tick();
+ var promise = JSC.JSInternalPromise.resolvedPromise(ctx.ptr(), result);
+
+ switch (promise.status(ctx.ptr().vm())) {
+ JSC.JSPromise.Status.Pending => {
+ while (promise.status(ctx.ptr().vm()) == .Pending) {
+ vm.tick();
+ }
+ result = promise.result(ctx.ptr().vm());
+ },
+ JSC.JSPromise.Status.Rejected => {
+ result = promise.result(ctx.ptr().vm());
+ exception.* = result.asObjectRef();
+ },
+ JSC.JSPromise.Status.Fulfilled => {
+ result = promise.result(ctx.ptr().vm());
+ },
+ }
}
}
diff --git a/src/bun.js/bindings/JSBuffer.cpp b/src/bun.js/bindings/JSBuffer.cpp
index 4b509f257..a1b9a5d40 100644
--- a/src/bun.js/bindings/JSBuffer.cpp
+++ b/src/bun.js/bindings/JSBuffer.cpp
@@ -248,8 +248,8 @@ static EncodedJSValue constructFromEncoding(JSGlobalObject* lexicalGlobalObject,
result = Bun__encoding__constructFromLatin1(lexicalGlobalObject, view.characters8(), view.length(), static_cast<uint8_t>(encoding));
break;
}
- case WebCore::BufferEncodingType::ascii: // ascii is a noop for latin1
- case WebCore::BufferEncodingType::latin1: { // The native encoding is latin1, so we don't need to do any conversion.
+ case WebCore::BufferEncodingType::ascii: // ascii is a noop for latin1
+ case WebCore::BufferEncodingType::latin1: { // The native encoding is latin1, so we don't need to do any conversion.
result = JSBuffer__bufferFromPointerAndLength(lexicalGlobalObject, view.characters8(), view.length());
break;
}
@@ -1216,7 +1216,7 @@ static inline JSC::EncodedJSValue jsBufferPrototypeFunction_writeBody(JSC::JSGlo
if (callFrame->argumentCount() > 2) {
uint32_t arg_len = 0;
arg_len = callFrame->argument(2).toUInt32(lexicalGlobalObject);
- length = std::min(arg_len, length-offset);
+ length = std::min(arg_len, length - offset);
}
if (callFrame->argumentCount() > 2) {
@@ -1329,7 +1329,6 @@ template<> EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSBufferConstructor::construc
JSC::JSObject* constructor = lexicalGlobalObject->m_typedArrayUint8.constructor(lexicalGlobalObject);
- // TODO: avoid this copy
MarkedArgumentBuffer args;
for (size_t i = 0; i < argsCount; ++i)
args.append(callFrame->uncheckedArgument(i));
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 51d55d2e0..a68376872 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -33,6 +33,7 @@ pub fn ConcurrentPromiseTask(comptime Context: type) type {
allocator: std.mem.Allocator,
promise: JSValue,
globalThis: *JSGlobalObject,
+ concurrent_task: JSC.ConcurrentTask = .{},
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
@@ -75,68 +76,7 @@ pub fn ConcurrentPromiseTask(comptime Context: type) type {
}
pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
- }
-
- pub fn deinit(this: *This) void {
- this.allocator.destroy(this);
- }
- };
-}
-
-pub fn SerialPromiseTask(comptime Context: type) type {
- return struct {
- const SerialWorkPool = @import("../work_pool.zig").NewWorkPool(1);
- const This = @This();
-
- ctx: *Context,
- task: WorkPoolTask = .{ .callback = runFromThreadPool },
- event_loop: *JSC.EventLoop,
- allocator: std.mem.Allocator,
- promise: JSValue,
- globalThis: *JSGlobalObject,
-
- pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
- var this = try allocator.create(This);
- this.* = .{
- .event_loop = VirtualMachine.vm.event_loop,
- .ctx = value,
- .allocator = allocator,
- .promise = JSValue.createInternalPromise(globalThis),
- .globalThis = globalThis,
- };
- js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef());
- VirtualMachine.vm.active_tasks +|= 1;
- return this;
- }
-
- pub fn runFromThreadPool(task: *WorkPoolTask) void {
- var this = @fieldParentPtr(This, "task", task);
- Context.run(this.ctx);
- this.onFinish();
- }
-
- pub fn runFromJS(this: This) void {
- var promise_value = this.promise;
- var promise = promise_value.asInternalPromise() orelse {
- if (comptime @hasDecl(Context, "deinit")) {
- @call(.{}, Context.deinit, .{this.ctx});
- }
- return;
- };
-
- var ctx = this.ctx;
-
- js.JSValueUnprotect(this.globalThis.ref(), promise_value.asObjectRef());
- ctx.then(promise, this.globalThis);
- }
-
- pub fn schedule(this: *This) void {
- SerialWorkPool.schedule(&this.task);
- }
-
- pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
+ this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this));
}
pub fn deinit(this: *This) void {
@@ -153,6 +93,7 @@ pub fn IOTask(comptime Context: type) type {
event_loop: *JSC.EventLoop,
allocator: std.mem.Allocator,
globalThis: *JSGlobalObject,
+ concurrent_task: ConcurrentTask = .{},
pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
var this = try allocator.create(This);
@@ -182,53 +123,7 @@ pub fn IOTask(comptime Context: type) type {
}
pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
- }
-
- pub fn deinit(this: *This) void {
- var allocator = this.allocator;
- this.* = undefined;
- allocator.destroy(this);
- }
- };
-}
-
-pub fn AsyncNativeCallbackTask(comptime Context: type) type {
- return struct {
- const This = @This();
- ctx: *Context,
- task: WorkPoolTask = .{ .callback = runFromThreadPool },
- event_loop: *JSC.EventLoop,
- allocator: std.mem.Allocator,
- globalThis: *JSGlobalObject,
-
- pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
- var this = try allocator.create(This);
- this.* = .{
- .event_loop = VirtualMachine.vm.eventLoop(),
- .ctx = value,
- .allocator = allocator,
- .globalThis = globalThis,
- };
- VirtualMachine.vm.active_tasks +|= 1;
- return this;
- }
-
- pub fn runFromThreadPool(task: *WorkPoolTask) void {
- var this = @fieldParentPtr(This, "task", task);
- Context.run(this.ctx, this);
- }
-
- pub fn runFromJS(this: This) void {
- this.ctx.runFromJS(this.globalThis);
- }
-
- pub fn schedule(this: *This) void {
- WorkPool.get().schedule(WorkPool.schedule(&this.task));
- }
-
- pub fn onFinish(this: *This) void {
- this.event_loop.enqueueTaskConcurrent(Task.init(this));
+ this.event_loop.enqueueTaskConcurrent(this.concurrent_task.from(this));
}
pub fn deinit(this: *This) void {
@@ -290,95 +185,97 @@ pub const Task = TaggedPointerUnion(.{
// PromiseTask,
// TimeoutTasklet,
});
+const UnboundedQueue = @import("./unbounded_queue.zig").UnboundedQueue;
+pub const ConcurrentTask = struct {
+ task: Task = undefined,
+ next: ?*ConcurrentTask = null,
+
+ pub const Queue = UnboundedQueue(ConcurrentTask, .next);
+
+ pub fn from(this: *ConcurrentTask, of: anytype) *ConcurrentTask {
+ this.* = .{
+ .task = Task.init(of),
+ .next = null,
+ };
+ return this;
+ }
+};
const AsyncIO = @import("io");
pub const EventLoop = struct {
- ready_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
- pending_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
- io_tasks_count: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
tasks: Queue = undefined,
- concurrent_tasks: Queue = undefined,
- concurrent_lock: Lock = Lock.init(),
+ concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{},
global: *JSGlobalObject = undefined,
virtual_machine: *VirtualMachine = undefined,
waker: ?AsyncIO.Waker = null,
-
+ defer_count: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
pub fn tickWithCount(this: *EventLoop) u32 {
- var finished: u32 = 0;
var global = this.global;
var global_vm = global.vm();
var vm_ = this.virtual_machine;
+ var counter: usize = 0;
while (this.tasks.readItem()) |task| {
+ defer counter += 1;
switch (task.tag()) {
.Microtask => {
var micro: *Microtask = task.as(Microtask);
micro.run(global);
- finished += 1;
},
.MicrotaskForDefaultGlobalObject => {
var micro: *MicrotaskForDefaultGlobalObject = task.as(MicrotaskForDefaultGlobalObject);
micro.run(global);
- finished += 1;
},
.FetchTasklet => {
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
fetch_task.onDone();
- finished += 1;
+ fetch_task.deinit();
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(AsyncTransformTask)) => {
var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(CopyFilePromiseTask)) => {
var transform_task: *CopyFilePromiseTask = task.get(CopyFilePromiseTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, typeBaseName(@typeName(JSC.napi.napi_async_work))) => {
var transform_task: *JSC.napi.napi_async_work = task.get(JSC.napi.napi_async_work).?;
transform_task.*.runFromJS();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(BunTimerTimeoutTask)) => {
var transform_task: *BunTimerTimeoutTask = task.get(BunTimerTimeoutTask).?;
transform_task.*.runFromJS();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(ReadFileTask)) => {
var transform_task: *ReadFileTask = task.get(ReadFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, @typeName(WriteFileTask)) => {
var transform_task: *WriteFileTask = task.get(WriteFileTask).?;
transform_task.*.runFromJS();
transform_task.deinit();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, typeBaseName(@typeName(AnyTask))) => {
var any: *AnyTask = task.get(AnyTask).?;
any.run();
- finished += 1;
vm_.active_tasks -|= 1;
},
@field(Task.Tag, typeBaseName(@typeName(CppTask))) => {
var any: *CppTask = task.get(CppTask).?;
any.run(global);
- finished += 1;
vm_.active_tasks -|= 1;
},
else => if (Environment.allow_assert) {
@@ -390,30 +287,39 @@ pub const EventLoop = struct {
global_vm.drainMicrotasks();
}
- if (finished > 0) {
- _ = this.pending_tasks_count.fetchSub(finished, .Monotonic);
+ if (this.tasks.count == 0) {
+ this.tasks.head = 0;
}
- return finished;
+ return @truncate(u32, counter);
}
pub fn tickConcurrent(this: *EventLoop) void {
- if (this.ready_tasks_count.load(.Monotonic) > 0) {
- this.concurrent_lock.lock();
- defer this.concurrent_lock.unlock();
- const add: u32 = @truncate(u32, this.concurrent_tasks.readableLength());
+ _ = this.tickConcurrentWithCount();
+ }
- // TODO: optimzie
- this.tasks.ensureUnusedCapacity(add) catch unreachable;
+ pub fn tickConcurrentWithCount(this: *EventLoop) usize {
+ var concurrent = this.concurrent_tasks.popBatch();
+ const count = concurrent.count;
+ if (count == 0)
+ return 0;
- {
- this.tasks.writeAssumeCapacity(this.concurrent_tasks.readableSlice(0));
- this.concurrent_tasks.discard(this.concurrent_tasks.count);
- }
+ var iter = concurrent.iterator();
+ const start_count = this.tasks.count;
+ if (start_count == 0) {
+ this.tasks.head = 0;
+ }
- _ = this.pending_tasks_count.fetchAdd(add, .Monotonic);
- _ = this.ready_tasks_count.fetchSub(add, .Monotonic);
+ this.tasks.ensureUnusedCapacity(count) catch unreachable;
+ var writable = this.tasks.writableSlice(0);
+ while (iter.next()) |task| {
+ writable[0] = task.task;
+ writable = writable[1..];
+ this.tasks.count += 1;
+ if (writable.len == 0) break;
}
+
+ return this.tasks.count - start_count;
}
// TODO: fix this technical debt
@@ -423,7 +329,9 @@ pub const EventLoop = struct {
this.tickConcurrent();
var global_vm = ctx.global.vm();
while (true) {
- while (this.tickWithCount() > 0) {} else {
+ while (this.tickWithCount() > 0) {
+ this.tickConcurrent();
+ } else {
global_vm.releaseWeakRefs();
global_vm.drainMicrotasks();
this.tickConcurrent();
@@ -436,13 +344,28 @@ pub const EventLoop = struct {
break;
}
- if (!ctx.disable_run_us_loop and ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) {
+ this.global.handleRejectedPromises();
+ }
+
+ pub fn runUSocketsLoop(this: *EventLoop) void {
+ var ctx = this.virtual_machine;
+
+ ctx.global.vm().releaseWeakRefs();
+ ctx.global.vm().drainMicrotasks();
+
+ if (ctx.us_loop_reference_count > 0 and !ctx.is_us_loop_entered) {
+ if (this.tickConcurrentWithCount() > 0) {
+ this.tick();
+ } else if (ctx.uws_event_loop.?.num_polls > 0) {
+ if ((@intCast(c_ulonglong, ctx.uws_event_loop.?.internal_loop_data.iteration_nr) % 1_000) == 1) {
+ _ = ctx.global.vm().runGC(true);
+ }
+ }
+
ctx.is_us_loop_entered = true;
ctx.enterUWSLoop();
ctx.is_us_loop_entered = false;
}
-
- this.global.handleRejectedPromises();
}
// TODO: fix this technical debt
@@ -451,6 +374,10 @@ pub const EventLoop = struct {
JSC.JSPromise.Status.Pending => {
while (promise.status(this.global.vm()) == .Pending) {
this.tick();
+
+ if (this.virtual_machine.uws_event_loop != null) {
+ this.runUSocketsLoop();
+ }
}
},
else => {},
@@ -459,13 +386,20 @@ pub const EventLoop = struct {
pub fn waitForTasks(this: *EventLoop) void {
this.tick();
- while (this.pending_tasks_count.load(.Monotonic) > 0) {
+ while (this.tasks.count > 0) {
this.tick();
+
+ if (this.virtual_machine.uws_event_loop != null) {
+ this.runUSocketsLoop();
+ }
+ } else {
+ if (this.virtual_machine.uws_event_loop != null) {
+ this.runUSocketsLoop();
+ }
}
}
pub fn enqueueTask(this: *EventLoop, task: Task) void {
- _ = this.pending_tasks_count.fetchAdd(1, .Monotonic);
this.tasks.writeItem(task) catch unreachable;
}
@@ -476,19 +410,25 @@ pub const EventLoop = struct {
}
}
- pub fn enqueueTaskConcurrent(this: *EventLoop, task: Task) void {
+ pub fn onDefer(this: *EventLoop) void {
+ this.defer_count.store(0, .Monotonic);
+ this.tick();
+ }
+
+ pub fn enqueueTaskConcurrent(this: *EventLoop, task: *ConcurrentTask) void {
JSC.markBinding();
- this.concurrent_lock.lock();
- defer this.concurrent_lock.unlock();
- this.concurrent_tasks.writeItem(task) catch unreachable;
+
+ this.concurrent_tasks.push(task);
+
if (this.virtual_machine.uws_event_loop) |loop| {
- loop.nextTick(*EventLoop, this, EventLoop.tick);
+ const deferCount = this.defer_count.fetchAdd(1, .Monotonic);
+ if (deferCount == 0) {
+ loop.nextTick(*EventLoop, this, onDefer);
+ }
}
- if (this.ready_tasks_count.fetchAdd(1, .Monotonic) == 0) {
- if (this.waker) |*waker| {
- waker.wake() catch unreachable;
- }
+ if (this.waker) |*waker| {
+ waker.wake() catch unreachable;
}
}
};
diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig
index 5eeff1ba8..4f97a79ad 100644
--- a/src/bun.js/javascript.zig
+++ b/src/bun.js/javascript.zig
@@ -368,7 +368,6 @@ pub const VirtualMachine = struct {
rare_data: ?*JSC.RareData = null,
poller: JSC.Poller = JSC.Poller{},
us_loop_reference_count: usize = 0,
- disable_run_us_loop: bool = false,
is_us_loop_entered: bool = false,
pub fn io(this: *VirtualMachine) *IO {
@@ -423,7 +422,7 @@ pub const VirtualMachine = struct {
this.eventLoop().enqueueTask(task);
}
- pub inline fn enqueueTaskConcurrent(this: *VirtualMachine, task: Task) void {
+ pub inline fn enqueueTaskConcurrent(this: *VirtualMachine, task: JSC.ConcurrentTask) void {
this.eventLoop().enqueueTaskConcurrent(task);
}
@@ -451,7 +450,7 @@ pub const VirtualMachine = struct {
this.macro_event_loop.tasks.ensureTotalCapacity(16) catch unreachable;
this.macro_event_loop.global = this.global;
this.macro_event_loop.virtual_machine = this;
- this.macro_event_loop.concurrent_tasks = EventLoop.Queue.init(default_allocator);
+ this.macro_event_loop.concurrent_tasks = .{};
}
this.bundler.options.platform = .bun_macro;
@@ -555,8 +554,7 @@ pub const VirtualMachine = struct {
default_allocator,
);
VirtualMachine.vm.regular_event_loop.tasks.ensureUnusedCapacity(64) catch unreachable;
- VirtualMachine.vm.regular_event_loop.concurrent_tasks = EventLoop.Queue.init(default_allocator);
- VirtualMachine.vm.regular_event_loop.concurrent_tasks.ensureUnusedCapacity(8) catch unreachable;
+ VirtualMachine.vm.regular_event_loop.concurrent_tasks = .{};
VirtualMachine.vm.event_loop = &VirtualMachine.vm.regular_event_loop;
vm.bundler.macro_context = null;
@@ -1435,6 +1433,7 @@ pub const VirtualMachine = struct {
pub fn loadEntryPoint(this: *VirtualMachine, entry_path: string) !*JSInternalPromise {
this.main = entry_path;
try this.entry_point.generate(@TypeOf(this.bundler), &this.bundler, Fs.PathName.init(entry_path), main_file_name);
+ this.eventLoop().ensureWaker();
var promise: *JSInternalPromise = undefined;
@@ -1455,7 +1454,15 @@ pub const VirtualMachine = struct {
promise = JSModuleLoader.loadAndEvaluateModule(this.global, &ZigString.init(this.main));
}
- this.waitForPromise(promise);
+ while (promise.status(this.global.vm()) == .Pending) {
+ this.eventLoop().tick();
+ _ = this.eventLoop().waker.?.wait() catch 0;
+ }
+
+ if (this.us_loop_reference_count > 0) {
+ _ = this.global.vm().runGC(true);
+ this.eventLoop().runUSocketsLoop();
+ }
return promise;
}
diff --git a/src/bun.js/unbounded_queue.zig b/src/bun.js/unbounded_queue.zig
new file mode 100644
index 000000000..fd092290d
--- /dev/null
+++ b/src/bun.js/unbounded_queue.zig
@@ -0,0 +1,149 @@
+const std = @import("std");
+
+const os = std.os;
+const mem = std.mem;
+const meta = std.meta;
+const atomic = std.atomic;
+const builtin = std.builtin;
+const testing = std.testing;
+
+const assert = std.debug.assert;
+
+const mpsc = @This();
+
+pub const cache_line_length = switch (@import("builtin").target.cpu.arch) {
+ .x86_64, .aarch64, .powerpc64 => 128,
+ .arm, .mips, .mips64, .riscv64 => 32,
+ .s390x => 256,
+ else => 64,
+};
+
+pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) type {
+ const next = meta.fieldInfo(T, next_field).name;
+
+ return struct {
+ const Self = @This();
+
+ pub const Batch = struct {
+ pub const Iterator = struct {
+ batch: Self.Batch,
+
+ pub fn next(self: *Self.Batch.Iterator) ?*T {
+ if (self.batch.count == 0) return null;
+ const front = self.batch.front orelse unreachable;
+ self.batch.front = @field(front, next);
+ self.batch.count -= 1;
+ return front;
+ }
+ };
+
+ front: ?*T = null,
+ last: ?*T = null,
+ count: usize = 0,
+
+ pub fn iterator(self: Self.Batch) Self.Batch.Iterator {
+ return .{ .batch = self };
+ }
+ };
+
+ pub const queue_padding_length = cache_line_length / 2;
+
+ back: ?*T align(queue_padding_length) = null,
+ count: usize = 0,
+ front: T align(queue_padding_length) = init: {
+ var stub: T = undefined;
+ @field(stub, next) = null;
+ break :init stub;
+ },
+
+ pub fn push(self: *Self, src: *T) void {
+ assert(@atomicRmw(usize, &self.count, .Add, 1, .Release) >= 0);
+
+ @field(src, next) = null;
+ const old_back = @atomicRmw(?*T, &self.back, .Xchg, src, .AcqRel) orelse &self.front;
+ @field(old_back, next) = src;
+ }
+
+ pub fn pushBatch(self: *Self, first: *T, last: *T, count: usize) void {
+ assert(@atomicRmw(usize, &self.count, .Add, count, .Release) >= 0);
+
+ @field(last, next) = null;
+ const old_back = @atomicRmw(?*T, &self.back, .Xchg, last, .AcqRel) orelse &self.front;
+ @field(old_back, next) = first;
+ }
+
+ pub fn pop(self: *Self) ?*T {
+ const first = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return null;
+ if (@atomicLoad(?*T, &@field(first, next), .Acquire)) |next_item| {
+ @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
+ assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
+ return first;
+ }
+ const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front;
+ if (first != last) return null;
+ @atomicStore(?*T, &@field(self.front, next), null, .Monotonic);
+ if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) {
+ assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
+ return first;
+ }
+ var next_item = @atomicLoad(?*T, &@field(first, next), .Acquire);
+ while (next_item == null) : (atomic.spinLoopHint()) {
+ next_item = @atomicLoad(?*T, &@field(first, next), .Acquire);
+ }
+ @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
+ assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
+ return first;
+ }
+
+ pub fn popBatch(self: *Self) Self.Batch {
+ var batch: Self.Batch = .{};
+
+ var front = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return batch;
+ batch.front = front;
+
+ var next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
+ while (next_item) |next_node| : (next_item = @atomicLoad(?*T, &@field(next_node, next), .Acquire)) {
+ batch.count += 1;
+ batch.last = front;
+
+ front = next_node;
+ }
+
+ const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front;
+ if (front != last) {
+ @atomicStore(?*T, &@field(self.front, next), front, .Release);
+ assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
+ return batch;
+ }
+
+ @atomicStore(?*T, &@field(self.front, next), null, .Monotonic);
+ if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) {
+ batch.count += 1;
+ batch.last = front;
+ assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
+ return batch;
+ }
+
+ next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
+ while (next_item == null) : (atomic.spinLoopHint()) {
+ next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
+ }
+
+ batch.count += 1;
+ @atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
+ batch.last = front;
+ assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
+ return batch;
+ }
+
+ pub fn peek(self: *Self) usize {
+ const count = @atomicLoad(usize, &self.count, .Acquire);
+ assert(count >= 0);
+ return count;
+ }
+
+ pub fn isEmpty(self: *Self) bool {
+ return self.peek() == 0;
+ }
+ };
+}
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 6c1fc49f9..2ef8225f3 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -515,90 +515,79 @@ pub const Fetch = struct {
);
pub const FetchTasklet = struct {
- promise: *JSPromise = undefined,
- http: HTTPClient.AsyncHTTP = undefined,
- result: HTTPClient.HTTPClientResult = undefined,
- status: Status = Status.pending,
+ http: ?*HTTPClient.AsyncHTTP = null,
+ result: HTTPClient.HTTPClientResult = .{},
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
-
- empty_request_body: MutableString = undefined,
-
- context: FetchTaskletContext = undefined,
+ request_body: Blob = undefined,
response_buffer: MutableString = undefined,
-
- blob_store: ?*Blob.Store = null,
-
- const Pool = ObjectPool(FetchTasklet, init, true, 32);
- const BodyPool = ObjectPool(MutableString, MutableString.init2048, true, 8);
- pub const FetchTaskletContext = struct {
- tasklet: *FetchTasklet,
- };
+ request_headers: Headers = Headers{ .allocator = undefined },
+ ref: *JSC.napi.Ref = undefined,
+ concurrent_task: JSC.ConcurrentTask = .{},
pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet {
return FetchTasklet{};
}
- pub const Status = enum(u8) {
- pending,
- running,
- done,
- };
+ fn clearData(this: *FetchTasklet) void {
+ this.request_headers.entries.deinit(bun.default_allocator);
+ this.request_headers.buf.deinit(bun.default_allocator);
+ this.request_headers = Headers{ .allocator = undefined };
+ this.http.?.deinit();
+
+ this.result.deinitMetadata();
+ this.response_buffer.deinit();
+ this.request_body.detach();
+ }
+
+ pub fn deinit(this: *FetchTasklet) void {
+ if (this.http) |http| this.javascript_vm.allocator.destroy(http);
+ this.javascript_vm.allocator.destroy(this);
+ }
pub fn onDone(this: *FetchTasklet) void {
if (comptime JSC.is_bindgen)
unreachable;
const globalThis = this.global_this;
- const promise = this.promise;
- const state = this.http.state.load(.Monotonic);
- const result = switch (state) {
- .success => this.onResolve(),
- .fail => this.onReject(),
- else => unreachable,
+
+ var ref = this.ref;
+ const promise_value = ref.get(globalThis);
+ defer ref.destroy(globalThis);
+
+ if (promise_value.isEmptyOrUndefinedOrNull()) {
+ this.clearData();
+ return;
+ }
+
+ var promise = promise_value.asPromise().?;
+
+ const success = this.result.isSuccess();
+ const result = switch (success) {
+ true => this.onResolve(),
+ false => this.onReject(),
};
- this.release();
- const promise_value = promise.asValue(globalThis);
- promise_value.unprotect();
+ this.clearData();
- switch (state) {
- .success => {
+ promise_value.ensureStillAlive();
+
+ switch (success) {
+ true => {
promise.resolve(globalThis, result);
},
- .fail => {
+ false => {
promise.reject(globalThis, result);
},
- else => unreachable,
}
}
- pub fn reset(_: *FetchTasklet) void {}
-
- pub fn release(this: *FetchTasklet) void {
- this.global_this = undefined;
- this.javascript_vm = undefined;
- this.promise = undefined;
- this.status = Status.pending;
- // var pooled = this.pooled_body;
- // BodyPool.release(pooled);
- // this.pooled_body = undefined;
- this.http = undefined;
-
- Pool.release(@fieldParentPtr(Pool.Node, "data", this));
- }
-
pub fn onReject(this: *FetchTasklet) JSValue {
- if (this.blob_store) |store| {
- this.blob_store = null;
- store.deref();
- }
- defer this.result.deinitMetadata();
const fetch_error = std.fmt.allocPrint(
default_allocator,
"fetch() failed {s}\nurl: \"{s}\"",
.{
- @errorName(this.http.err orelse error.HTTPFail),
+ this.result.fail,
this.result.href,
},
) catch unreachable;
@@ -606,26 +595,27 @@ pub const Fetch = struct {
}
pub fn onResolve(this: *FetchTasklet) JSValue {
- var allocator = default_allocator;
- var http_response = this.http.response.?;
+ var allocator = this.global_this.bunVM().allocator;
+ const http_response = this.result.response;
var response = allocator.create(Response) catch unreachable;
- if (this.blob_store) |store| {
- this.blob_store = null;
- store.deref();
- }
- defer this.result.deinitMetadata();
+ const blob = Blob.init(this.response_buffer.toOwnedSliceLeaky(), allocator, this.global_this);
+ this.response_buffer = .{ .allocator = default_allocator, .list = .{
+ .items = &.{},
+ .capacity = 0,
+ } };
+
response.* = Response{
.allocator = allocator,
.url = allocator.dupe(u8, this.result.href) catch unreachable,
.status_text = allocator.dupe(u8, http_response.status) catch unreachable,
- .redirected = this.http.redirected,
+ .redirected = this.result.redirected,
.body = .{
.init = .{
.headers = FetchHeaders.createFromPicoHeaders(this.global_this, http_response.headers),
.status_code = @truncate(u16, http_response.status_code),
},
.value = .{
- .Blob = Blob.init(this.http.response_buffer.toOwnedSliceLeaky(), allocator, this.global_this),
+ .Blob = blob,
},
},
};
@@ -636,38 +626,50 @@ pub const Fetch = struct {
allocator: std.mem.Allocator,
method: Method,
url: ZigURL,
- headers: Headers.Entries,
- headers_buf: string,
- request_body: ?*MutableString,
+ headers: Headers,
+ request_body: Blob,
timeout: usize,
- request_body_store: ?*Blob.Store,
- ) !*FetchTasklet.Pool.Node {
- var linked_list = FetchTasklet.Pool.get(allocator);
- linked_list.data.javascript_vm = VirtualMachine.vm;
- linked_list.data.empty_request_body = MutableString.init(allocator, 0) catch unreachable;
- // linked_list.data.pooled_body = BodyPool.get(allocator);
- linked_list.data.blob_store = request_body_store;
- linked_list.data.response_buffer = MutableString.initEmpty(allocator);
- linked_list.data.http = HTTPClient.AsyncHTTP.init(
+ globalThis: *JSC.JSGlobalObject,
+ promise: JSValue,
+ ) !*FetchTasklet {
+ var jsc_vm = globalThis.bunVM();
+ var fetch_tasklet = try jsc_vm.allocator.create(FetchTasklet);
+ if (request_body.store) |store| {
+ store.ref();
+ }
+
+ fetch_tasklet.* = .{
+ .response_buffer = MutableString{
+ .allocator = bun.default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ },
+ .http = try jsc_vm.allocator.create(HTTPClient.AsyncHTTP),
+ .javascript_vm = jsc_vm,
+ .request_body = request_body,
+ .global_this = globalThis,
+ .request_headers = headers,
+ .ref = JSC.napi.Ref.create(globalThis, promise),
+ };
+ fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(
allocator,
method,
url,
- headers,
- headers_buf,
- &linked_list.data.response_buffer,
- request_body orelse &linked_list.data.empty_request_body,
+ headers.entries,
+ headers.buf.items,
+ &fetch_tasklet.response_buffer,
+ request_body.sharedView(),
timeout,
- undefined,
- );
- linked_list.data.context = .{ .tasklet = &linked_list.data };
- linked_list.data.http.completion_callback = HTTPClient.HTTPClientResult.Callback.New(
- *FetchTasklet,
- FetchTasklet.callback,
- ).init(
- &linked_list.data,
+ HTTPClient.HTTPClientResult.Callback.New(
+ *FetchTasklet,
+ FetchTasklet.callback,
+ ).init(
+ fetch_tasklet,
+ ),
);
-
- return linked_list;
+ return fetch_tasklet;
}
pub fn queue(
@@ -675,27 +677,36 @@ pub const Fetch = struct {
global: *JSGlobalObject,
method: Method,
url: ZigURL,
- headers: Headers.Entries,
- headers_buf: string,
- request_body: ?*MutableString,
+ headers: Headers,
+ request_body: Blob,
timeout: usize,
- request_body_store: ?*Blob.Store,
- ) !*FetchTasklet.Pool.Node {
+ promise: JSValue,
+ ) !*FetchTasklet {
try HTTPClient.HTTPThread.init();
- var node = try get(allocator, method, url, headers, headers_buf, request_body, timeout, request_body_store);
+ var node = try get(
+ allocator,
+ method,
+ url,
+ headers,
+ request_body,
+ timeout,
+ global,
+ promise,
+ );
- node.data.global_this = global;
var batch = NetworkThread.Batch{};
- node.data.http.schedule(allocator, &batch);
- HTTPClient.http_thread.schedule(batch);
+ node.http.?.schedule(allocator, &batch);
VirtualMachine.vm.active_tasks +|= 1;
+
+ HTTPClient.http_thread.schedule(batch);
+
return node;
}
pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void {
task.response_buffer = result.body.?.*;
task.result = result;
- task.javascript_vm.eventLoop().enqueueTaskConcurrent(Task.init(task));
+ task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task));
}
};
@@ -715,12 +726,11 @@ pub const Fetch = struct {
}
var headers: ?Headers = null;
- var body: MutableString = MutableString.initEmpty(bun.default_allocator);
var method = Method.GET;
var args = JSC.Node.ArgumentsSlice.from(ctx.bunVM(), arguments);
var url: ZigURL = undefined;
var first_arg = args.nextEat().?;
- var blob_store: ?*Blob.Store = null;
+ var body: Blob = Blob.initEmpty(ctx);
if (first_arg.isString()) {
var url_zig_str = ZigString.init("");
JSValue.fromRef(arguments[0]).toZigString(&url_zig_str, globalThis);
@@ -737,7 +747,6 @@ pub const Fetch = struct {
url_str = getAllocator(ctx).dupe(u8, url_str) catch unreachable;
}
- NetworkThread.init() catch @panic("Failed to start network thread");
url = ZigURL.parse(url_str);
if (arguments.len >= 2 and arguments[1].?.value().isObject()) {
@@ -760,18 +769,7 @@ pub const Fetch = struct {
if (options.fastGet(ctx.ptr(), .body)) |body__| {
if (Blob.fromJS(ctx.ptr(), body__, true, false)) |new_blob| {
- if (new_blob.size > 0) {
- body = MutableString{
- .list = std.ArrayListUnmanaged(u8){
- .items = bun.constStrToU8(new_blob.sharedView()),
- .capacity = new_blob.size,
- },
- .allocator = bun.default_allocator,
- };
- blob_store = new_blob.store;
- }
- // transfer is unnecessary here because this is a new slice
- //new_blob.transfer();
+ body = new_blob;
} else |_| {
return JSPromise.rejectedPromiseValue(globalThis, ZigString.init("fetch() received invalid body").toErrorInstance(globalThis)).asRef();
}
@@ -783,54 +781,28 @@ pub const Fetch = struct {
if (request.headers) |head| {
headers = Headers.from(head, bun.default_allocator) catch unreachable;
}
- var blob = request.body.use();
- // TODO: make RequestBody _NOT_ a MutableString
- body = MutableString{
- .list = std.ArrayListUnmanaged(u8){
- .items = bun.constStrToU8(blob.sharedView()),
- .capacity = bun.constStrToU8(blob.sharedView()).len,
- },
- .allocator = blob.allocator orelse bun.default_allocator,
- };
- blob_store = blob.store;
+ body = request.body.use();
} else {
const fetch_error = fetch_type_error_strings.get(js.JSValueGetType(ctx, arguments[0]));
return JSPromise.rejectedPromiseValue(globalThis, ZigString.init(fetch_error).toErrorInstance(globalThis)).asRef();
}
- var header_entries: Headers.Entries = .{};
- var header_buf: string = "";
-
- if (headers) |head| {
- header_entries = head.entries;
- header_buf = head.buf.items;
- }
-
- var request_body: ?*MutableString = null;
- if (body.list.items.len > 0) {
- var mutable = bun.default_allocator.create(MutableString) catch unreachable;
- mutable.* = body;
- request_body = mutable;
- }
+ var deferred_promise = JSC.C.JSObjectMakeDeferredPromise(globalThis, null, null, null);
// var resolve = FetchTasklet.FetchResolver.Class.make(ctx: js.JSContextRef, ptr: *ZigType)
- var queued = FetchTasklet.queue(
+ _ = FetchTasklet.queue(
default_allocator,
globalThis,
method,
url,
- header_entries,
- header_buf,
- request_body,
+ headers orelse Headers{
+ .allocator = bun.default_allocator,
+ },
+ body,
std.time.ns_per_hour,
- blob_store,
+ JSC.JSValue.fromRef(deferred_promise),
) catch unreachable;
- const promise = JSC.JSPromise.create(ctx);
- queued.data.promise = promise;
- const promise_value = promise.asValue(ctx);
- promise_value.protect();
-
- return promise_value.asObjectRef();
+ return deferred_promise;
}
};
@@ -1991,7 +1963,7 @@ pub const Blob = struct {
}
}
pub fn run(this: *ReadFile, task: *ReadFileTask) void {
- var frame = HTTPClient.getAllocator().create(@Frame(runAsync)) catch unreachable;
+ var frame = bun.default_allocator.create(@Frame(runAsync)) catch unreachable;
_ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{ this, task });
}
@@ -2020,7 +1992,7 @@ pub const Blob = struct {
task.onFinish();
suspend {
- HTTPClient.getAllocator().destroy(@frame());
+ bun.default_allocator.destroy(@frame());
}
}
@@ -2236,7 +2208,7 @@ pub const Blob = struct {
cb(cb_ctx, .{ .result = @truncate(SizeType, wrote) });
}
pub fn run(this: *WriteFile, task: *WriteFileTask) void {
- var frame = HTTPClient.getAllocator().create(@Frame(runAsync)) catch unreachable;
+ var frame = bun.default_allocator.create(@Frame(runAsync)) catch unreachable;
_ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{ this, task });
}
@@ -2244,7 +2216,7 @@ pub const Blob = struct {
this._runAsync();
task.onFinish();
suspend {
- HTTPClient.getAllocator().destroy(@frame());
+ bun.default_allocator.destroy(@frame());
}
}
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 242e3db49..1158a8dd2 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -2340,6 +2340,7 @@ pub const FileBlobLoader = struct {
read_frame: anyframe = undefined,
chunk_size: Blob.SizeType = 0,
main_thread_task: JSC.AnyTask = .{ .callback = onJSThread, .ctx = null },
+ concurrent_task: JSC.ConcurrentTask = .{},
pub fn taskCallback(task: *NetworkThread.Task) void {
var this = @fieldParentPtr(FileBlobLoader, "concurrent", @fieldParentPtr(Concurrent, "task", task));
@@ -2475,7 +2476,7 @@ pub const FileBlobLoader = struct {
pub fn scheduleMainThreadTask(this: *FileBlobLoader) void {
this.concurrent.main_thread_task.ctx = this;
- this.loop.enqueueTaskConcurrent(JSC.Task.init(&this.concurrent.main_thread_task));
+ this.loop.enqueueTaskConcurrent(this.concurrent.concurrent_task.from(&this.concurrent.main_thread_task));
}
fn runAsync(this: *FileBlobLoader) void {