diff options
-rw-r--r-- | src/javascript/jsc/javascript.zig | 5 | ||||
-rw-r--r-- | src/napi/napi.zig | 264 |
2 files changed, 254 insertions, 15 deletions
diff --git a/src/javascript/jsc/javascript.zig b/src/javascript/jsc/javascript.zig index f6147a1dd..c193f5090 100644 --- a/src/javascript/jsc/javascript.zig +++ b/src/javascript/jsc/javascript.zig @@ -480,11 +480,12 @@ pub const SavedSourceMap = struct { const uws = @import("uws"); pub const AnyTask = struct { - ctx: *anyopaque, + ctx: ?*anyopaque, callback: fn (*anyopaque) void, pub fn run(this: *AnyTask) void { - this.callback(this.ctx); + @setRuntimeSafety(false); + this.callback(this.ctx.?); } pub fn New(comptime Type: type, comptime Callback: anytype) type { diff --git a/src/napi/napi.zig b/src/napi/napi.zig index ca02a2ce1..0e7169f18 100644 --- a/src/napi/napi.zig +++ b/src/napi/napi.zig @@ -2,10 +2,13 @@ const std = @import("std"); const JSC = @import("javascript_core"); const strings = @import("strings"); const bun = @import("../global.zig"); +const Lock = @import("../lock.zig").Lock; const JSValue = JSC.JSValue; const ZigString = JSC.ZigString; const TODO_EXCEPTION: JSC.C.ExceptionRef = null; +const Channel = @import("../sync.zig").Channel; + pub const napi_env = *JSC.JSGlobalObject; pub const napi_ref = struct_napi_ref__; pub const napi_handle_scope = napi_env; @@ -634,6 +637,7 @@ pub export fn napi_get_array_length(env: napi_env, value: napi_value, result: [* return .ok; } pub export fn napi_strict_equals(env: napi_env, lhs: napi_value, rhs: napi_value, result: *bool) napi_status { + // there is some nuance with NaN here i'm not sure about result.* = lhs.isSameValue(rhs, env); return .ok; } @@ -977,17 +981,17 @@ pub const napi_async_work = struct { ); } }; -pub const struct_napi_threadsafe_function__ = opaque {}; -pub const napi_threadsafe_function = ?*struct_napi_threadsafe_function__; -pub const napi_tsfn_release: c_int = 0; -pub const napi_tsfn_abort: c_int = 1; -pub const napi_threadsafe_function_release_mode = c_uint; +pub const napi_threadsafe_function = *ThreadSafeFunction; +pub const napi_threadsafe_function_release_mode = enum(c_uint) { + release = 0, + abort = 1, +}; pub const napi_tsfn_nonblocking: c_int = 0; pub const napi_tsfn_blocking: c_int = 1; pub const napi_threadsafe_function_call_mode = c_uint; pub const napi_async_execute_callback = ?fn (napi_env, ?*anyopaque) callconv(.C) void; pub const napi_async_complete_callback = ?fn (napi_env, napi_status, ?*anyopaque) callconv(.C) void; -pub const napi_threadsafe_function_call_js = ?fn (napi_env, napi_value, ?*anyopaque, ?*anyopaque) callconv(.C) void; +pub const napi_threadsafe_function_call_js = fn (napi_env, napi_value, ?*anyopaque, ?*anyopaque) callconv(.C) void; pub const napi_node_version = extern struct { major: u32, minor: u32, @@ -1141,15 +1145,249 @@ pub export fn napi_remove_env_cleanup_hook(env: napi_env, fun: ?fn (?*anyopaque) return .ok; } + +pub const Finalizer = struct { + fun: napi_finalize, + ctx: ?*anyopaque = null, +}; + +// TODO: generate comptime version of this instead of runtime checking +pub const ThreadSafeFunction = struct { + /// thread-safe functions can be "referenced" and "unreferenced". A + /// "referenced" thread-safe function will cause the event loop on the thread + /// on which it is created to remain alive until the thread-safe function is + /// destroyed. In contrast, an "unreferenced" thread-safe function will not + /// prevent the event loop from exiting. The APIs napi_ref_threadsafe_function + /// and napi_unref_threadsafe_function exist for this purpose. + /// + /// Neither does napi_unref_threadsafe_function mark the thread-safe + /// functions as able to be destroyed nor does napi_ref_threadsafe_function + /// prevent it from being destroyed. + ref_for_process_exit: bool = false, + + owning_threads: std.AutoArrayHashMapUnmanaged(u64) = .{}, + owning_thread_lock: Lock = Lock.init(), + event_loop: *JSC.VirtualMachine.EventLoop, + finalizer: ?*Finalizer = null, + + javascript_function: JSValue, + finalizer_task: JSC.AnyTask = undefined, + finalizer: Finalizer = Finalizer{ .fun = null, .ctx = null }, + channel: Queue, + + ctx: ?*anyopaque = null, + + call_js: ?napi_threadsafe_function_call_js = null, + + const ThreadSafeFunctionTask = JSC.AnyTask.New(@This(), call); + pub const Queue = union(enum) { + sized: Channel(?*anyopaque, .Slice), + unsized: Channel(?*anyopaque, .Slice), + + pub fn isClosed(this: *const @This()) bool { + return @atomicLoad( + bool, + switch (this) { + .sized => &this.size.is_closed, + .unsized => &this.unsized.is_closed, + }, + .SeqCst, + ); + } + + pub fn close(this: *@This()) bool { + switch (this) { + .sized => this.size.close(), + .unsized => this.unsized.close(), + } + } + + pub fn init(size: usize, allocator: std.mem.Allocator) @This() { + switch (size) { + 0 => { + return .{ + .unsized = Channel(?*anyopaque, .Dynamic).init(allocator), + }; + }, + else => { + var slice = allocator.alloc(?*anyopaque, size) catch unreachable; + return .{ + .sized = Channel(?*anyopaque, .Slice).init(slice), + }; + }, + } + } + + pub fn writeItem(this: *@This(), value: ?*anyopaque) !void { + switch (this.*) { + .sized => try this.sized.writeItem(value), + .unsized => try this.unsized.writeItem(value), + } + } + + pub fn readItem(this: *@This()) !?*anyopaque { + switch (this.*) { + .sized => try this.sized.readItem(), + .unsized => try this.unsized.readItem(), + } + } + + pub fn tryWriteItem(this: *@This(), value: ?*anyopaque) !bool { + switch (this.*) { + .sized => try this.sized.tryWriteItem(value), + .unsized => try this.unsized.tryWriteItem(value), + } + } + + pub fn tryReadItem(this: *@This()) !??*anyopaque { + switch (this.*) { + .sized => try this.sized.tryReadItem(), + .unsized => try this.unsized.tryReadItem(), + } + } + }; + + pub fn call(this: *ThreadSafeFunction) void { + var task = this.channel.tryReadItem() catch null orelse return; + if (this.call_js) |cb| { + cb(this.event_loop.global, this.javascript_function, task, this.ctx); + } else { + // TODO: wrapper that reports errors + _ = JSC.C.JSObjectCallAsFunction( + this.event_loop.global.ref(), + this.javascript_function.asObjectRef(), + JSC.JSValue.jsUndefined().asObjectRef(), + 0, + null, + null, + ); + } + } + + pub fn enqueue(this: *ThreadSafeFunction, ctx: ?*anyopaque, block: bool) !void { + if (block) { + try this.channel.writeItem(JSC.AnyTask{ .ctx = ctx, .run = this.call }); + } else { + if (!this.channel.tryWriteItem(JSC.AnyTask{ .ctx = ctx, .run = this.call })) { + return error.WouldBlock; + } + } + + this.event_loop.enqueueTaskConcurrent(ThreadSafeFunction.init(this)); + } + + pub fn finalize(opaq: *anyopaque) void { + var this = bun.cast(*ThreadSafeFunction, opaq); + if (this.finalizer.fun) |fun| { + fun(this.finalizer.ctx); + } + + JSC.C.JSValueUnprotect(this.event_loop.global.ref(), this.javascript_function.asObjectRef()); + bun.default_allocator.destroy(this); + } + + pub fn ref(this: *ThreadSafeFunction) void { + this.ref_for_process_exit = true; + } + + pub fn unref(this: *ThreadSafeFunction) void { + this.ref_for_process_exit = false; + } + + pub fn acquire(this: *ThreadSafeFunction) !void { + this.owning_thread_lock.lock(); + defer this.owning_thread_lock.unlock(); + if (this.channel.isClosed()) + return error.Closed; + _ = this.owning_threads.getOrPut(bun.default_allocator, std.Thread.getCurrentId()) catch unreachable; + } + + pub fn release(this: *ThreadSafeFunction, mode: napi_threadsafe_function_release_mode) void { + this.owning_thread_lock.lock(); + defer this.owning_thread_lock.unlock(); + if (!this.owning_threads.swapRemove(std.Thread.getCurrentId())) + return; + + if (mode == .abort) { + this.channel.close(); + } + + if (this.owning_threads.count() == 0) { + this.finalizer_task = JSC.AnyTask{ .ctx = this, .callback = finalize }; + this.event_loop.enqueueTaskConcurrent(&this.finalizer_task); + return; + } + } +}; pub extern fn napi_open_callback_scope(env: napi_env, resource_object: napi_value, context: napi_async_context, result: [*c]napi_callback_scope) napi_status; pub extern fn napi_close_callback_scope(env: napi_env, scope: napi_callback_scope) napi_status; -pub extern fn napi_create_threadsafe_function(env: napi_env, func: napi_value, async_resource: napi_value, async_resource_name: napi_value, max_queue_size: usize, initial_thread_count: usize, thread_finalize_data: ?*anyopaque, thread_finalize_cb: napi_finalize, context: ?*anyopaque, call_js_cb: napi_threadsafe_function_call_js, result: [*c]napi_threadsafe_function) napi_status; -pub extern fn napi_get_threadsafe_function_context(func: napi_threadsafe_function, result: [*]*anyopaque) napi_status; -pub extern fn napi_call_threadsafe_function(func: napi_threadsafe_function, data: ?*anyopaque, is_blocking: napi_threadsafe_function_call_mode) napi_status; -pub extern fn napi_acquire_threadsafe_function(func: napi_threadsafe_function) napi_status; -pub extern fn napi_release_threadsafe_function(func: napi_threadsafe_function, mode: napi_threadsafe_function_release_mode) napi_status; -pub extern fn napi_unref_threadsafe_function(env: napi_env, func: napi_threadsafe_function) napi_status; -pub extern fn napi_ref_threadsafe_function(env: napi_env, func: napi_threadsafe_function) napi_status; +pub export fn napi_create_threadsafe_function( + env: napi_env, + func: napi_value, + _: napi_value, + _: napi_value, + max_queue_size: usize, + initial_thread_count: usize, + thread_finalize_data: ?*anyopaque, + thread_finalize_cb: napi_finalize, + context: ?*anyopaque, + call_js_cb: ?napi_threadsafe_function_call_js, + result: *napi_threadsafe_function, +) napi_status { + // TODO: don't do this + // just have a GC hook for this... + JSC.C.JSValueProtect(env.ref(), func.asObjectRef()); + var function = bun.default_allocator.create(ThreadSafeFunction) catch return .generic_failure; + function.* = .{ + .event_loop = JSC.VirtualMachine.vm.eventLoop(), + .javascript_function = func, + .call_js = call_js_cb, + .ctx = context, + .queue = ThreadSafeFunction.Queue.init(max_queue_size, bun.default_allocator), + .owning_threads = .{}, + }; + function.owning_threads.ensureTotalCapacity(bun.default_allocator, initial_thread_count) catch return .generic_failure; + function.finalizer = .{ .ctx = thread_finalize_data, .fun = thread_finalize_cb }; + result.* = function; + return .ok; +} +pub export fn napi_get_threadsafe_function_context(func: napi_threadsafe_function, result: *?*anyopaque) napi_status { + result.* = func.ctx; + return .ok; +} +pub export fn napi_call_threadsafe_function(func: napi_threadsafe_function, data: ?*anyopaque, is_blocking: napi_threadsafe_function_call_mode) napi_status { + func.enqueue(data, is_blocking) catch |err| { + switch (err) { + error.WouldBlock => { + return napi_status.queue_full; + }, + error.Closing => { + return napi_status.closing; + }, + } + }; + return .ok; +} +pub export fn napi_acquire_threadsafe_function(func: napi_threadsafe_function) napi_status { + func.acquire() catch return .closing; + return .ok; +} +pub export fn napi_release_threadsafe_function(func: napi_threadsafe_function, mode: napi_threadsafe_function_release_mode) napi_status { + func.release(mode); + return .ok; +} +pub export fn napi_unref_threadsafe_function(env: napi_env, func: napi_threadsafe_function) napi_status { + std.debug.assert(func.event_loop.global == env); + + func.unref(); + return .ok; +} +pub export fn napi_ref_threadsafe_function(env: napi_env, func: napi_threadsafe_function) napi_status { + std.debug.assert(func.event_loop.global == env); + + func.ref(); + return .ok; +} pub export fn napi_add_async_cleanup_hook(_: napi_env, _: napi_async_cleanup_hook, _: ?*anyopaque, _: [*c]napi_async_cleanup_hook_handle) napi_status { // TODO: |