diff options
Diffstat (limited to 'src/bun.js/api')
-rw-r--r-- | src/bun.js/api/bun.classes.ts | 5 | ||||
-rw-r--r-- | src/bun.js/api/bun.zig | 1 | ||||
-rw-r--r-- | src/bun.js/api/bun/subprocess.zig | 159 |
3 files changed, 157 insertions, 8 deletions
diff --git a/src/bun.js/api/bun.classes.ts b/src/bun.js/api/bun.classes.ts index 6d8e80b6d..36f48f790 100644 --- a/src/bun.js/api/bun.classes.ts +++ b/src/bun.js/api/bun.classes.ts @@ -44,6 +44,11 @@ export default [ length: 0, }, + send: { + fn: "doSend", + length: 1, + }, + kill: { fn: "kill", length: 1, diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig index f86031caf..36e52821f 100644 --- a/src/bun.js/api/bun.zig +++ b/src/bun.js/api/bun.zig @@ -4426,6 +4426,7 @@ pub const EnvironmentVariables = struct { } return len; } + pub fn getEnvValue(globalObject: *JSC.JSGlobalObject, name: ZigString) ?ZigString { var vm = globalObject.bunVM(); var sliced = name.toSlice(vm.allocator); diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index c3244131d..50bd846ac 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -1,4 +1,3 @@ -const Bun = @This(); const default_allocator = @import("root").bun.default_allocator; const bun = @import("root").bun; const Environment = bun.Environment; @@ -14,6 +13,8 @@ const JSC = @import("root").bun.JSC; const JSValue = JSC.JSValue; const JSGlobalObject = JSC.JSGlobalObject; const Which = @import("../../../which.zig"); +const uws = @import("../../../deps/uws.zig"); +const IPC = @import("../../ipc.zig"); pub const Subprocess = struct { const log = Output.scoped(.Subprocess, false); @@ -61,15 +62,27 @@ pub const Subprocess = struct { is_sync: bool = false, this_jsvalue: JSC.JSValue = .zero, + ipc: IPCMode, + // this is only ever accessed when `ipc` is not `none` + ipc_socket: IPC.Socket = undefined, + ipc_callback: JSC.Strong = .{}, + ipc_buffer: bun.ByteList, + pub const SignalCode = bun.SignalCode; + pub const IPCMode = enum { + none, + bun, + // json, + }; + pub fn hasExited(this: *const Subprocess) bool { return this.exit_code != null or this.waitpid_err != null or this.signal_code != null; } pub fn updateHasPendingActivityFlag(this: *Subprocess) void { @fence(.SeqCst); - this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null, .SeqCst); + this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null and this.ipc == .none, .SeqCst); } pub fn hasPendingActivity(this: *Subprocess) callconv(.C) bool { @@ -79,7 +92,7 @@ pub const Subprocess = struct { pub fn updateHasPendingActivity(this: *Subprocess) void { @fence(.Release); - this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null, .Release); + this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null and this.ipc == .none, .Release); } pub fn ref(this: *Subprocess) void { @@ -411,6 +424,35 @@ pub const Subprocess = struct { return JSC.JSValue.jsUndefined(); } + pub fn doSend(this: *Subprocess, global: *JSC.JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSValue { + if (this.ipc == .none) { + global.throw("Subprocess.send() can only be used if an IPC channel is open.", .{}); + return .zero; + } + + if (callFrame.argumentsCount() == 0) { + global.throwInvalidArguments("Subprocess.send() requires one argument", .{}); + return .zero; + } + + const value = callFrame.argument(0); + + const success = IPC.serializeJSValueForSubprocess( + global, + value, + this.ipc_socket.fd(), + ); + if (!success) return .zero; + + return JSC.JSValue.jsUndefined(); + } + + pub fn disconnect(this: *Subprocess) void { + if (this.ipc == .none) return; + this.ipc_socket.close(0, null); + this.ipc = .none; + } + pub fn getPid( this: *Subprocess, _: *JSGlobalObject, @@ -1070,8 +1112,11 @@ pub const Subprocess = struct { var PATH = jsc_vm.bundler.env.get("PATH") orelse ""; var argv: std.ArrayListUnmanaged(?[*:0]const u8) = undefined; var cmd_value = JSValue.zero; - var detached: bool = false; + var detached = false; var args = args_; + var ipc_mode = IPCMode.none; + var ipc_callback: JSValue = .zero; + { if (args.isEmptyOrUndefinedOrNull()) { globalThis.throwInvalidArguments("cmd must be an array", .{}); @@ -1164,7 +1209,11 @@ pub const Subprocess = struct { globalThis.throwInvalidArguments("onExit must be a function or undefined", .{}); return .zero; } - on_exit_callback = onExit_.withAsyncContextIfNeeded(globalThis); + + on_exit_callback = if (comptime is_sync) + onExit_ + else + onExit_.withAsyncContextIfNeeded(globalThis); } } @@ -1186,8 +1235,13 @@ pub const Subprocess = struct { return .zero; }; + // If the env object does not include a $PATH, it must disable path lookup for argv[0] + PATH = ""; + while (object_iter.next()) |key| { var value = object_iter.value; + if (value == .undefined) continue; + var line = std.fmt.allocPrintZ(allocator, "{}={}", .{ key, value.getZigString(globalThis) }) catch { globalThis.throw("out of memory", .{}); return .zero; @@ -1209,7 +1263,7 @@ pub const Subprocess = struct { if (!stdio_val.isEmptyOrUndefinedOrNull()) { if (stdio_val.jsType().isArray()) { var stdio_iter = stdio_val.arrayIterator(globalThis); - stdio_iter.len = @min(stdio_iter.len, 3); + stdio_iter.len = @min(stdio_iter.len, 4); var i: u32 = 0; while (stdio_iter.next()) |value| : (i += 1) { if (!extractStdio(globalThis, i, value, &stdio)) @@ -1250,6 +1304,15 @@ pub const Subprocess = struct { detached = detached_val.toBoolean(); } } + + if (args.get(globalThis, "ipc")) |val| { + if (val.isCell() and val.isCallable(globalThis.vm())) { + // In the future, we should add a way to use a different IPC serialization format, specifically `json`. + // but the only use case this has is doing interop with node.js IPC and other programs. + ipc_mode = .bun; + ipc_callback = val.withAsyncContextIfNeeded(globalThis); + } + } } } @@ -1328,6 +1391,42 @@ pub const Subprocess = struct { return .zero; }; + // IPC is currently implemented in a very limited way. + // + // Node lets you pass as many fds as you want, they all become be sockets; then, IPC is just a special + // runtime-owned version of "pipe" (in which pipe is a misleading name since they're bidirectional sockets). + // + // Bun currently only supports three fds: stdin, stdout, and stderr, which are all unidirectional + // + // And then fd 3 is assigned specifically and only for IPC. This is quite lame, because Node.js allows + // the ipc fd to be any number and it just works. But most people only care about the default `.fork()` + // behavior, where this workaround suffices. + // + // When Bun.spawn() is given a `.onMessage` callback, it enables IPC as follows: + var socket: IPC.Socket = undefined; + if (ipc_mode != .none) { + if (comptime is_sync) { + globalThis.throwInvalidArguments("IPC is not supported in Bun.spawnSync", .{}); + return .zero; + } + + env_array.ensureUnusedCapacity(allocator, 2) catch |err| return globalThis.handleError(err, "in posix_spawn"); + env_array.appendAssumeCapacity("BUN_INTERNAL_IPC_FD=3"); + + var fds: [2]uws.LIBUS_SOCKET_DESCRIPTOR = undefined; + socket = uws.newSocketFromPair( + jsc_vm.rareData().spawnIPCContext(jsc_vm), + @sizeOf(*Subprocess), + &fds, + ) orelse { + globalThis.throw("failed to create socket pair: E{s}", .{ + @tagName(bun.sys.getErrno(-1)), + }); + return .zero; + }; + actions.dup2(fds[1], 3) catch |err| return globalThis.handleError(err, "in posix_spawn"); + } + env_array.append(allocator, null) catch { globalThis.throw("out of memory", .{}); return .zero; @@ -1389,7 +1488,6 @@ pub const Subprocess = struct { globalThis.throw("out of memory", .{}); return .zero; }; - // When run synchronously, subprocess isn't garbage collected subprocess.* = Subprocess{ .globalThis = globalThis, @@ -1404,7 +1502,16 @@ pub const Subprocess = struct { .stderr = Readable.init(stdio[bun.STDERR_FD], stderr_pipe[0], jsc_vm.allocator, default_max_buffer_size), .on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{}, .is_sync = is_sync, + .ipc = ipc_mode, + // will be assigned in the block below + .ipc_socket = socket, + .ipc_buffer = bun.ByteList{}, + .ipc_callback = if (ipc_callback != .zero) JSC.Strong.create(ipc_callback, globalThis) else undefined, }; + if (ipc_mode != .none) { + var ptr = socket.ext(*Subprocess); + ptr.?.* = subprocess; + } if (subprocess.stdin == .pipe) { subprocess.stdin.pipe.signal = JSC.WebCore.Signal.init(&subprocess.stdin); @@ -1610,6 +1717,7 @@ pub const Subprocess = struct { globalThis: *JSC.JSGlobalObject, this_jsvalue: JSC.JSValue, ) void { + log("onExit {d}, code={d}", .{ this.pid, if (this.exit_code) |e| @as(i32, @intCast(e)) else -1 }); defer this.updateHasPendingActivity(); this_jsvalue.ensureStillAlive(); this.has_waitpid_task = false; @@ -1715,7 +1823,6 @@ pub const Subprocess = struct { try actions.dup2(std_fileno, std_fileno); } }, - .ignore => { const flag = if (std_fileno == bun.STDIN_FD) @as(u32, os.O.RDONLY) else @as(u32, std.os.O.WRONLY); try actions.openZ(std_fileno, "/dev/null", flag, 0o664); @@ -1867,10 +1974,46 @@ pub const Subprocess = struct { .held = JSC.Strong.create(array_buffer.value, globalThis), }, }; + return true; } globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'ignore', or null", .{}); return false; } + + pub fn handleIPCMessage( + this: *Subprocess, + message: IPC.DecodedIPCMessage, + ) void { + switch (message) { + // In future versions we can read this in order to detect version mismatches, + // or disable future optimizations if the subprocess is old. + .version => |v| { + IPC.log("Child IPC version is {d}", .{v}); + }, + .data => |data| { + IPC.log("Received IPC message from child", .{}); + if (this.ipc_callback.get()) |cb| { + const result = cb.callWithThis( + this.globalThis, + this.this_jsvalue, + &[_]JSValue{data}, + ); + data.ensureStillAlive(); + if (result.isAnyError()) { + this.globalThis.bunVM().onUnhandledError(this.globalThis, result); + } + } + }, + } + } + + pub fn handleIPCClose(this: *Subprocess, _: IPC.Socket) void { + // uSocket is already freed so calling .close() on the socket can segfault + this.ipc = .none; + this.updateHasPendingActivity(); + } + + pub const IPCHandler = IPC.NewIPCHandler(Subprocess); }; |