aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/javascript/jsc/javascript.zig5
-rw-r--r--src/napi/napi.zig264
2 files changed, 254 insertions, 15 deletions
diff --git a/src/javascript/jsc/javascript.zig b/src/javascript/jsc/javascript.zig
index f6147a1dd..c193f5090 100644
--- a/src/javascript/jsc/javascript.zig
+++ b/src/javascript/jsc/javascript.zig
@@ -480,11 +480,12 @@ pub const SavedSourceMap = struct {
const uws = @import("uws");
pub const AnyTask = struct {
- ctx: *anyopaque,
+ ctx: ?*anyopaque,
callback: fn (*anyopaque) void,
pub fn run(this: *AnyTask) void {
- this.callback(this.ctx);
+ @setRuntimeSafety(false);
+ this.callback(this.ctx.?);
}
pub fn New(comptime Type: type, comptime Callback: anytype) type {
diff --git a/src/napi/napi.zig b/src/napi/napi.zig
index ca02a2ce1..0e7169f18 100644
--- a/src/napi/napi.zig
+++ b/src/napi/napi.zig
@@ -2,10 +2,13 @@ const std = @import("std");
const JSC = @import("javascript_core");
const strings = @import("strings");
const bun = @import("../global.zig");
+const Lock = @import("../lock.zig").Lock;
const JSValue = JSC.JSValue;
const ZigString = JSC.ZigString;
const TODO_EXCEPTION: JSC.C.ExceptionRef = null;
+const Channel = @import("../sync.zig").Channel;
+
pub const napi_env = *JSC.JSGlobalObject;
pub const napi_ref = struct_napi_ref__;
pub const napi_handle_scope = napi_env;
@@ -634,6 +637,7 @@ pub export fn napi_get_array_length(env: napi_env, value: napi_value, result: [*
return .ok;
}
pub export fn napi_strict_equals(env: napi_env, lhs: napi_value, rhs: napi_value, result: *bool) napi_status {
+ // there is some nuance with NaN here i'm not sure about
result.* = lhs.isSameValue(rhs, env);
return .ok;
}
@@ -977,17 +981,17 @@ pub const napi_async_work = struct {
);
}
};
-pub const struct_napi_threadsafe_function__ = opaque {};
-pub const napi_threadsafe_function = ?*struct_napi_threadsafe_function__;
-pub const napi_tsfn_release: c_int = 0;
-pub const napi_tsfn_abort: c_int = 1;
-pub const napi_threadsafe_function_release_mode = c_uint;
+pub const napi_threadsafe_function = *ThreadSafeFunction;
+pub const napi_threadsafe_function_release_mode = enum(c_uint) {
+ release = 0,
+ abort = 1,
+};
pub const napi_tsfn_nonblocking: c_int = 0;
pub const napi_tsfn_blocking: c_int = 1;
pub const napi_threadsafe_function_call_mode = c_uint;
pub const napi_async_execute_callback = ?fn (napi_env, ?*anyopaque) callconv(.C) void;
pub const napi_async_complete_callback = ?fn (napi_env, napi_status, ?*anyopaque) callconv(.C) void;
-pub const napi_threadsafe_function_call_js = ?fn (napi_env, napi_value, ?*anyopaque, ?*anyopaque) callconv(.C) void;
+pub const napi_threadsafe_function_call_js = fn (napi_env, napi_value, ?*anyopaque, ?*anyopaque) callconv(.C) void;
pub const napi_node_version = extern struct {
major: u32,
minor: u32,
@@ -1141,15 +1145,249 @@ pub export fn napi_remove_env_cleanup_hook(env: napi_env, fun: ?fn (?*anyopaque)
return .ok;
}
+
+pub const Finalizer = struct {
+ fun: napi_finalize,
+ ctx: ?*anyopaque = null,
+};
+
+// TODO: generate comptime version of this instead of runtime checking
+pub const ThreadSafeFunction = struct {
+ /// thread-safe functions can be "referenced" and "unreferenced". A
+ /// "referenced" thread-safe function will cause the event loop on the thread
+ /// on which it is created to remain alive until the thread-safe function is
+ /// destroyed. In contrast, an "unreferenced" thread-safe function will not
+ /// prevent the event loop from exiting. The APIs napi_ref_threadsafe_function
+ /// and napi_unref_threadsafe_function exist for this purpose.
+ ///
+ /// Neither does napi_unref_threadsafe_function mark the thread-safe
+ /// functions as able to be destroyed nor does napi_ref_threadsafe_function
+ /// prevent it from being destroyed.
+ ref_for_process_exit: bool = false,
+
+ owning_threads: std.AutoArrayHashMapUnmanaged(u64) = .{},
+ owning_thread_lock: Lock = Lock.init(),
+ event_loop: *JSC.VirtualMachine.EventLoop,
+ finalizer: ?*Finalizer = null,
+
+ javascript_function: JSValue,
+ finalizer_task: JSC.AnyTask = undefined,
+ finalizer: Finalizer = Finalizer{ .fun = null, .ctx = null },
+ channel: Queue,
+
+ ctx: ?*anyopaque = null,
+
+ call_js: ?napi_threadsafe_function_call_js = null,
+
+ const ThreadSafeFunctionTask = JSC.AnyTask.New(@This(), call);
+ pub const Queue = union(enum) {
+ sized: Channel(?*anyopaque, .Slice),
+ unsized: Channel(?*anyopaque, .Slice),
+
+ pub fn isClosed(this: *const @This()) bool {
+ return @atomicLoad(
+ bool,
+ switch (this) {
+ .sized => &this.size.is_closed,
+ .unsized => &this.unsized.is_closed,
+ },
+ .SeqCst,
+ );
+ }
+
+ pub fn close(this: *@This()) bool {
+ switch (this) {
+ .sized => this.size.close(),
+ .unsized => this.unsized.close(),
+ }
+ }
+
+ pub fn init(size: usize, allocator: std.mem.Allocator) @This() {
+ switch (size) {
+ 0 => {
+ return .{
+ .unsized = Channel(?*anyopaque, .Dynamic).init(allocator),
+ };
+ },
+ else => {
+ var slice = allocator.alloc(?*anyopaque, size) catch unreachable;
+ return .{
+ .sized = Channel(?*anyopaque, .Slice).init(slice),
+ };
+ },
+ }
+ }
+
+ pub fn writeItem(this: *@This(), value: ?*anyopaque) !void {
+ switch (this.*) {
+ .sized => try this.sized.writeItem(value),
+ .unsized => try this.unsized.writeItem(value),
+ }
+ }
+
+ pub fn readItem(this: *@This()) !?*anyopaque {
+ switch (this.*) {
+ .sized => try this.sized.readItem(),
+ .unsized => try this.unsized.readItem(),
+ }
+ }
+
+ pub fn tryWriteItem(this: *@This(), value: ?*anyopaque) !bool {
+ switch (this.*) {
+ .sized => try this.sized.tryWriteItem(value),
+ .unsized => try this.unsized.tryWriteItem(value),
+ }
+ }
+
+ pub fn tryReadItem(this: *@This()) !??*anyopaque {
+ switch (this.*) {
+ .sized => try this.sized.tryReadItem(),
+ .unsized => try this.unsized.tryReadItem(),
+ }
+ }
+ };
+
+ pub fn call(this: *ThreadSafeFunction) void {
+ var task = this.channel.tryReadItem() catch null orelse return;
+ if (this.call_js) |cb| {
+ cb(this.event_loop.global, this.javascript_function, task, this.ctx);
+ } else {
+ // TODO: wrapper that reports errors
+ _ = JSC.C.JSObjectCallAsFunction(
+ this.event_loop.global.ref(),
+ this.javascript_function.asObjectRef(),
+ JSC.JSValue.jsUndefined().asObjectRef(),
+ 0,
+ null,
+ null,
+ );
+ }
+ }
+
+ pub fn enqueue(this: *ThreadSafeFunction, ctx: ?*anyopaque, block: bool) !void {
+ if (block) {
+ try this.channel.writeItem(JSC.AnyTask{ .ctx = ctx, .run = this.call });
+ } else {
+ if (!this.channel.tryWriteItem(JSC.AnyTask{ .ctx = ctx, .run = this.call })) {
+ return error.WouldBlock;
+ }
+ }
+
+ this.event_loop.enqueueTaskConcurrent(ThreadSafeFunction.init(this));
+ }
+
+ pub fn finalize(opaq: *anyopaque) void {
+ var this = bun.cast(*ThreadSafeFunction, opaq);
+ if (this.finalizer.fun) |fun| {
+ fun(this.finalizer.ctx);
+ }
+
+ JSC.C.JSValueUnprotect(this.event_loop.global.ref(), this.javascript_function.asObjectRef());
+ bun.default_allocator.destroy(this);
+ }
+
+ pub fn ref(this: *ThreadSafeFunction) void {
+ this.ref_for_process_exit = true;
+ }
+
+ pub fn unref(this: *ThreadSafeFunction) void {
+ this.ref_for_process_exit = false;
+ }
+
+ pub fn acquire(this: *ThreadSafeFunction) !void {
+ this.owning_thread_lock.lock();
+ defer this.owning_thread_lock.unlock();
+ if (this.channel.isClosed())
+ return error.Closed;
+ _ = this.owning_threads.getOrPut(bun.default_allocator, std.Thread.getCurrentId()) catch unreachable;
+ }
+
+ pub fn release(this: *ThreadSafeFunction, mode: napi_threadsafe_function_release_mode) void {
+ this.owning_thread_lock.lock();
+ defer this.owning_thread_lock.unlock();
+ if (!this.owning_threads.swapRemove(std.Thread.getCurrentId()))
+ return;
+
+ if (mode == .abort) {
+ this.channel.close();
+ }
+
+ if (this.owning_threads.count() == 0) {
+ this.finalizer_task = JSC.AnyTask{ .ctx = this, .callback = finalize };
+ this.event_loop.enqueueTaskConcurrent(&this.finalizer_task);
+ return;
+ }
+ }
+};
pub extern fn napi_open_callback_scope(env: napi_env, resource_object: napi_value, context: napi_async_context, result: [*c]napi_callback_scope) napi_status;
pub extern fn napi_close_callback_scope(env: napi_env, scope: napi_callback_scope) napi_status;
-pub extern fn napi_create_threadsafe_function(env: napi_env, func: napi_value, async_resource: napi_value, async_resource_name: napi_value, max_queue_size: usize, initial_thread_count: usize, thread_finalize_data: ?*anyopaque, thread_finalize_cb: napi_finalize, context: ?*anyopaque, call_js_cb: napi_threadsafe_function_call_js, result: [*c]napi_threadsafe_function) napi_status;
-pub extern fn napi_get_threadsafe_function_context(func: napi_threadsafe_function, result: [*]*anyopaque) napi_status;
-pub extern fn napi_call_threadsafe_function(func: napi_threadsafe_function, data: ?*anyopaque, is_blocking: napi_threadsafe_function_call_mode) napi_status;
-pub extern fn napi_acquire_threadsafe_function(func: napi_threadsafe_function) napi_status;
-pub extern fn napi_release_threadsafe_function(func: napi_threadsafe_function, mode: napi_threadsafe_function_release_mode) napi_status;
-pub extern fn napi_unref_threadsafe_function(env: napi_env, func: napi_threadsafe_function) napi_status;
-pub extern fn napi_ref_threadsafe_function(env: napi_env, func: napi_threadsafe_function) napi_status;
+pub export fn napi_create_threadsafe_function(
+ env: napi_env,
+ func: napi_value,
+ _: napi_value,
+ _: napi_value,
+ max_queue_size: usize,
+ initial_thread_count: usize,
+ thread_finalize_data: ?*anyopaque,
+ thread_finalize_cb: napi_finalize,
+ context: ?*anyopaque,
+ call_js_cb: ?napi_threadsafe_function_call_js,
+ result: *napi_threadsafe_function,
+) napi_status {
+ // TODO: don't do this
+ // just have a GC hook for this...
+ JSC.C.JSValueProtect(env.ref(), func.asObjectRef());
+ var function = bun.default_allocator.create(ThreadSafeFunction) catch return .generic_failure;
+ function.* = .{
+ .event_loop = JSC.VirtualMachine.vm.eventLoop(),
+ .javascript_function = func,
+ .call_js = call_js_cb,
+ .ctx = context,
+ .queue = ThreadSafeFunction.Queue.init(max_queue_size, bun.default_allocator),
+ .owning_threads = .{},
+ };
+ function.owning_threads.ensureTotalCapacity(bun.default_allocator, initial_thread_count) catch return .generic_failure;
+ function.finalizer = .{ .ctx = thread_finalize_data, .fun = thread_finalize_cb };
+ result.* = function;
+ return .ok;
+}
+pub export fn napi_get_threadsafe_function_context(func: napi_threadsafe_function, result: *?*anyopaque) napi_status {
+ result.* = func.ctx;
+ return .ok;
+}
+pub export fn napi_call_threadsafe_function(func: napi_threadsafe_function, data: ?*anyopaque, is_blocking: napi_threadsafe_function_call_mode) napi_status {
+ func.enqueue(data, is_blocking) catch |err| {
+ switch (err) {
+ error.WouldBlock => {
+ return napi_status.queue_full;
+ },
+ error.Closing => {
+ return napi_status.closing;
+ },
+ }
+ };
+ return .ok;
+}
+pub export fn napi_acquire_threadsafe_function(func: napi_threadsafe_function) napi_status {
+ func.acquire() catch return .closing;
+ return .ok;
+}
+pub export fn napi_release_threadsafe_function(func: napi_threadsafe_function, mode: napi_threadsafe_function_release_mode) napi_status {
+ func.release(mode);
+ return .ok;
+}
+pub export fn napi_unref_threadsafe_function(env: napi_env, func: napi_threadsafe_function) napi_status {
+ std.debug.assert(func.event_loop.global == env);
+
+ func.unref();
+ return .ok;
+}
+pub export fn napi_ref_threadsafe_function(env: napi_env, func: napi_threadsafe_function) napi_status {
+ std.debug.assert(func.event_loop.global == env);
+
+ func.ref();
+ return .ok;
+}
pub export fn napi_add_async_cleanup_hook(_: napi_env, _: napi_async_cleanup_hook, _: ?*anyopaque, _: [*c]napi_async_cleanup_hook_handle) napi_status {
// TODO: