diff options
author | 2022-01-21 03:37:05 -0800 | |
---|---|---|
committer | 2022-01-21 03:37:05 -0800 | |
commit | 8d623e21b672065f0ad29c5183f56761fec37891 (patch) | |
tree | 75383de4772ca16f7f09d2e17a000e4268548b68 /src | |
parent | 7a87e41ab849838346562f7f65e28d79783bbb29 (diff) | |
download | bun-8d623e21b672065f0ad29c5183f56761fec37891.tar.gz bun-8d623e21b672065f0ad29c5183f56761fec37891.tar.zst bun-8d623e21b672065f0ad29c5183f56761fec37891.zip |
[Bun.js] Add a threadpool for doing CPU-bound work
Diffstat (limited to 'src')
-rw-r--r-- | src/javascript/jsc/javascript.zig | 79 | ||||
-rw-r--r-- | src/tagged_pointer.zig | 58 | ||||
-rw-r--r-- | src/work_pool.zig | 32 |
3 files changed, 148 insertions, 21 deletions
diff --git a/src/javascript/jsc/javascript.zig b/src/javascript/jsc/javascript.zig index 581d184fa..a3a961b52 100644 --- a/src/javascript/jsc/javascript.zig +++ b/src/javascript/jsc/javascript.zig @@ -93,6 +93,7 @@ pub const GlobalClasses = [_]type{ // The last item in this array becomes "process.env" Bun.EnvironmentVariables.Class, }; + const Blob = @import("../../blob.zig"); pub const Buffer = MarkedArrayBuffer; const Lock = @import("../../lock.zig").Lock; @@ -1004,9 +1005,68 @@ const bun_file_import_path = "/node_modules.server.bun"; const FetchTasklet = Fetch.FetchTasklet; const TaggedPointerUnion = @import("../../tagged_pointer.zig").TaggedPointerUnion; +const WorkPool = @import("../../work_pool.zig"); +pub fn ConcurrentPromiseTask(comptime Context: type) type { + return struct { + const This = @This(); + ctx: *Context, + task: WorkPool.Task = .{ .callback = runFromThreadPool }, + event_loop: *VirtualMachine.EventLoop, + allocator: std.mem.Allocator, + promise: JSValue, + + pub fn createOnJSThread(allocator: std.mem.Allocator, globalThis: *JSGlobalObject, value: *Context) !*This { + var this = try allocator.create(This); + this.* = .{ + .event_loop = VirtualMachine.vm.event_loop, + .ctx = value, + .allocator = allocator, + .promise = JSValue.createInternalPromise(globalThis), + }; + js.JSValueProtect(globalThis.ref(), this.promise.asObjectRef()); + return this; + } + + pub fn runFromThreadPool(task: *WorkPool.Task) void { + var this = @fieldParentPtr(This, "task", task); + Context.run(this.ctx); + this.onFinish(); + } + + pub fn runFromJS(this: This) void { + var promise_value = this.promise; + var promise = promise_value.asInternalPromise() orelse { + if (comptime @hasDecl(Context, "deinit")) { + @call(.{}, Context.deinit, .{this.ctx}); + } + return; + }; + + var ctx = this.ctx; + + js.JSValueUnprotect(VirtualMachine.vm.global.ref(), promise_value.asObjectRef()); + ctx.then(promise); + } + + pub fn schedule(this: *This) void { + WorkPool.schedule(&this.task); + } + + pub fn onFinish(this: *This) void { + this.event_loop.enqueueTaskConcurrent(Task.init(this)); + } + + pub fn deinit(this: *This) void { + this.allocator.destroy(this); + } + }; +} +const AsyncTransformTask = @import("./api/transpiler.zig").TransformTask.AsyncTransformTask; pub const Task = TaggedPointerUnion(.{ FetchTasklet, Microtask, + AsyncTransformTask, + // TimeoutTasklet, }); @@ -1080,6 +1140,12 @@ pub const VirtualMachine = struct { fetch_task.onDone(); finished += 1; }, + @field(Task.Tag, @typeName(AsyncTransformTask)) => { + var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?; + transform_task.*.runFromJS(); + transform_task.deinit(); + finished += 1; + }, else => unreachable, } } @@ -1095,13 +1161,18 @@ pub const VirtualMachine = struct { if (this.ready_tasks_count.load(.Monotonic) > 0) { this.concurrent_lock.lock(); defer this.concurrent_lock.unlock(); - var add: u32 = 0; + const add: u32 = @truncate(u32, this.concurrent_tasks.readableLength()); // TODO: optimzie - this.tasks.ensureUnusedCapacity(this.concurrent_tasks.readableLength()) catch unreachable; - while (this.concurrent_tasks.readItem()) |task| { - this.tasks.writeItemAssumeCapacity(task); + this.tasks.ensureUnusedCapacity(add) catch unreachable; + + { + @fence(.SeqCst); + while (this.concurrent_tasks.readItem()) |task| { + this.tasks.writeItemAssumeCapacity(task); + } } + _ = this.pending_tasks_count.fetchAdd(add, .Monotonic); _ = this.ready_tasks_count.fetchSub(add, .Monotonic); } diff --git a/src/tagged_pointer.zig b/src/tagged_pointer.zig index bb094f6d5..d2b9a0ea6 100644 --- a/src/tagged_pointer.zig +++ b/src/tagged_pointer.zig @@ -52,30 +52,54 @@ pub const TaggedPointer = packed struct { pub fn TaggedPointerUnion(comptime Types: anytype) type { const TagType: type = tag_break: { - var enumFields: [Types.len]std.builtin.TypeInfo.EnumField = undefined; - var decls = [_]std.builtin.TypeInfo.Declaration{}; + if (std.meta.trait.isIndexable(@TypeOf(Types))) { + var enumFields: [Types.len]std.builtin.TypeInfo.EnumField = undefined; + var decls = [_]std.builtin.TypeInfo.Declaration{}; + + inline for (Types) |field, i| { + enumFields[i] = .{ + .name = @typeName(field), + .value = 1024 - i, + }; + } - inline for (Types) |field, i| { - enumFields[i] = .{ - .name = @typeName(field), - .value = 1024 - i, - }; - } + break :tag_break @Type(.{ + .Enum = .{ + .layout = .Auto, + .tag_type = TagSize, + .fields = &enumFields, + .decls = &decls, + .is_exhaustive = false, + }, + }); + } else { + const Fields: []const std.builtin.TypeInfo.StructField = std.meta.fields(@TypeOf(Types)); + var enumFields: [Fields.len]std.builtin.TypeInfo.EnumField = undefined; + var decls = [_]std.builtin.TypeInfo.Declaration{}; + + inline for (Fields) |field, i| { + enumFields[i] = .{ + .name = @typeName(field.default_value.?), + .value = 1024 - i, + }; + } - break :tag_break @Type(.{ - .Enum = .{ - .layout = .Auto, - .tag_type = TagSize, - .fields = &enumFields, - .decls = &decls, - .is_exhaustive = false, - }, - }); + break :tag_break @Type(.{ + .Enum = .{ + .layout = .Auto, + .tag_type = TagSize, + .fields = &enumFields, + .decls = &decls, + .is_exhaustive = false, + }, + }); + } }; return struct { pub const Tag = TagType; repr: TaggedPointer, + const This = @This(); fn assert_type(comptime Type: type) void { if (!comptime @hasField(Tag, @typeName(Type))) { diff --git a/src/work_pool.zig b/src/work_pool.zig new file mode 100644 index 000000000..090f74216 --- /dev/null +++ b/src/work_pool.zig @@ -0,0 +1,32 @@ +const ThreadPool = @import("thread_pool"); +const std = @import("std"); +var pool: ThreadPool = undefined; +var loaded: bool = false; + +pub const Batch = ThreadPool.Batch; +pub const Task = ThreadPool.Task; + +fn create() *ThreadPool { + @setCold(true); + + pool = ThreadPool.init(.{ + .max_threads = @floatToInt(u32, @floor(@intToFloat(f32, @maximum(std.Thread.getCpuCount() catch 0, 2)) * 0.8)), + .stack_size = 2 * 1024 * 1024, + }); + return &pool; +} +pub inline fn get() *ThreadPool { + // lil racy + if (loaded) return &pool; + loaded = true; + + return create(); +} + +pub fn scheduleBatch(batch: ThreadPool.Batch) void { + get().schedule(batch); +} + +pub fn schedule(task: *ThreadPool.Task) void { + get().schedule(ThreadPool.Batch.from(task)); +} |