aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/javascript/jsc/javascript.zig79
-rw-r--r--src/tagged_pointer.zig58
-rw-r--r--src/work_pool.zig32
3 files changed, 148 insertions, 21 deletions
diff --git a/src/javascript/jsc/javascript.zig b/src/javascript/jsc/javascript.zig
index 581d184fa..a3a961b52 100644
--- a/src/javascript/jsc/javascript.zig
+++ b/src/javascript/jsc/javascript.zig
@@ -93,6 +93,7 @@ pub const GlobalClasses = [_]type{
// The last item in this array becomes "process.env"
Bun.EnvironmentVariables.Class,
};
+
const Blob = @import("../../blob.zig");
pub const Buffer = MarkedArrayBuffer;
const Lock = @import("../../lock.zig").Lock;
@@ -1004,9 +1005,68 @@ const bun_file_import_path = "/node_modules.server.bun";
const FetchTasklet = Fetch.FetchTasklet;
const TaggedPointerUnion = @import("../../tagged_pointer.zig").TaggedPointerUnion;
+const WorkPool = @import("../../work_pool.zig");
+pub fn ConcurrentPromiseTask(comptime Context: type) type {
+ return struct {
+ const This = @This();
+ ctx: *Context,
+ task: WorkPool.Task = .{ .callback = runFromThreadPool },
+ event_loop: *VirtualMachine.EventLoop,
+ allocator: std.mem.Allocator,
+ promise: JSValue,
+
+ pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This {
+ var this = try allocator.create(This);
+ this.* = .{
+ .event_loop = VirtualMachine.vm.event_loop,
+ .ctx = value,
+ .allocator = allocator,
+ .promise = JSValue.createInternalPromise(globalThis),
+ };
+ js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef());
+ return this;
+ }
+
+ pub fn runFromThreadPool(task: *WorkPool.Task) void {
+ var this = @fieldParentPtr(This, "task", task);
+ Context.run(this.ctx);
+ this.onFinish();
+ }
+
+ pub fn runFromJS(this: This) void {
+ var promise_value = this.promise;
+ var promise = promise_value.asInternalPromise() orelse {
+ if (comptime @hasDecl(Context, "deinit")) {
+ @call(.{}, Context.deinit, .{this.ctx});
+ }
+ return;
+ };
+
+ var ctx = this.ctx;
+
+ js.JSValueUnprotect(VirtualMachine.vm.global.ref(), promise_value.asObjectRef());
+ ctx.then(promise);
+ }
+
+ pub fn schedule(this: *This) void {
+ WorkPool.schedule(&this.task);
+ }
+
+ pub fn onFinish(this: *This) void {
+ this.event_loop.enqueueTaskConcurrent(Task.init(this));
+ }
+
+ pub fn deinit(this: *This) void {
+ this.allocator.destroy(this);
+ }
+ };
+}
+const AsyncTransformTask = @import("./api/transpiler.zig").TransformTask.AsyncTransformTask;
pub const Task = TaggedPointerUnion(.{
FetchTasklet,
Microtask,
+ AsyncTransformTask,
+
// TimeoutTasklet,
});
@@ -1080,6 +1140,12 @@ pub const VirtualMachine = struct {
fetch_task.onDone();
finished += 1;
},
+ @field(Task.Tag, @typeName(AsyncTransformTask)) => {
+ var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
+ transform_task.*.runFromJS();
+ transform_task.deinit();
+ finished += 1;
+ },
else => unreachable,
}
}
@@ -1095,13 +1161,18 @@ pub const VirtualMachine = struct {
if (this.ready_tasks_count.load(.Monotonic) > 0) {
this.concurrent_lock.lock();
defer this.concurrent_lock.unlock();
- var add: u32 = 0;
+ const add: u32 = @truncate(u32, this.concurrent_tasks.readableLength());
// TODO: optimzie
- this.tasks.ensureUnusedCapacity(this.concurrent_tasks.readableLength()) catch unreachable;
- while (this.concurrent_tasks.readItem()) |task| {
- this.tasks.writeItemAssumeCapacity(task);
+ this.tasks.ensureUnusedCapacity(add) catch unreachable;
+
+ {
+ @fence(.SeqCst);
+ while (this.concurrent_tasks.readItem()) |task| {
+ this.tasks.writeItemAssumeCapacity(task);
+ }
}
+
_ = this.pending_tasks_count.fetchAdd(add, .Monotonic);
_ = this.ready_tasks_count.fetchSub(add, .Monotonic);
}
diff --git a/src/tagged_pointer.zig b/src/tagged_pointer.zig
index bb094f6d5..d2b9a0ea6 100644
--- a/src/tagged_pointer.zig
+++ b/src/tagged_pointer.zig
@@ -52,30 +52,54 @@ pub const TaggedPointer = packed struct {
pub fn TaggedPointerUnion(comptime Types: anytype) type {
const TagType: type = tag_break: {
- var enumFields: [Types.len]std.builtin.TypeInfo.EnumField = undefined;
- var decls = [_]std.builtin.TypeInfo.Declaration{};
+ if (std.meta.trait.isIndexable(@TypeOf(Types))) {
+ var enumFields: [Types.len]std.builtin.TypeInfo.EnumField = undefined;
+ var decls = [_]std.builtin.TypeInfo.Declaration{};
+
+ inline for (Types) |field, i| {
+ enumFields[i] = .{
+ .name = @typeName(field),
+ .value = 1024 - i,
+ };
+ }
- inline for (Types) |field, i| {
- enumFields[i] = .{
- .name = @typeName(field),
- .value = 1024 - i,
- };
- }
+ break :tag_break @Type(.{
+ .Enum = .{
+ .layout = .Auto,
+ .tag_type = TagSize,
+ .fields = &enumFields,
+ .decls = &decls,
+ .is_exhaustive = false,
+ },
+ });
+ } else {
+ const Fields: []const std.builtin.TypeInfo.StructField = std.meta.fields(@TypeOf(Types));
+ var enumFields: [Fields.len]std.builtin.TypeInfo.EnumField = undefined;
+ var decls = [_]std.builtin.TypeInfo.Declaration{};
+
+ inline for (Fields) |field, i| {
+ enumFields[i] = .{
+ .name = @typeName(field.default_value.?),
+ .value = 1024 - i,
+ };
+ }
- break :tag_break @Type(.{
- .Enum = .{
- .layout = .Auto,
- .tag_type = TagSize,
- .fields = &enumFields,
- .decls = &decls,
- .is_exhaustive = false,
- },
- });
+ break :tag_break @Type(.{
+ .Enum = .{
+ .layout = .Auto,
+ .tag_type = TagSize,
+ .fields = &enumFields,
+ .decls = &decls,
+ .is_exhaustive = false,
+ },
+ });
+ }
};
return struct {
pub const Tag = TagType;
repr: TaggedPointer,
+
const This = @This();
fn assert_type(comptime Type: type) void {
if (!comptime @hasField(Tag, @typeName(Type))) {
diff --git a/src/work_pool.zig b/src/work_pool.zig
new file mode 100644
index 000000000..090f74216
--- /dev/null
+++ b/src/work_pool.zig
@@ -0,0 +1,32 @@
+const ThreadPool = @import("thread_pool");
+const std = @import("std");
+var pool: ThreadPool = undefined;
+var loaded: bool = false;
+
+pub const Batch = ThreadPool.Batch;
+pub const Task = ThreadPool.Task;
+
+fn create() *ThreadPool {
+ @setCold(true);
+
+ pool = ThreadPool.init(.{
+ .max_threads = @floatToInt(u32, @floor(@intToFloat(f32, @maximum(std.Thread.getCpuCount() catch 0, 2)) * 0.8)),
+ .stack_size = 2 * 1024 * 1024,
+ });
+ return &pool;
+}
+pub inline fn get() *ThreadPool {
+ // lil racy
+ if (loaded) return &pool;
+ loaded = true;
+
+ return create();
+}
+
+pub fn scheduleBatch(batch: ThreadPool.Batch) void {
+ get().schedule(batch);
+}
+
+pub fn schedule(task: *ThreadPool.Task) void {
+ get().schedule(ThreadPool.Batch.from(task));
+}