aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/api')
-rw-r--r--src/bun.js/api/bun.classes.ts5
-rw-r--r--src/bun.js/api/bun.zig1
-rw-r--r--src/bun.js/api/bun/subprocess.zig159
3 files changed, 157 insertions, 8 deletions
diff --git a/src/bun.js/api/bun.classes.ts b/src/bun.js/api/bun.classes.ts
index 6d8e80b6d..36f48f790 100644
--- a/src/bun.js/api/bun.classes.ts
+++ b/src/bun.js/api/bun.classes.ts
@@ -44,6 +44,11 @@ export default [
length: 0,
},
+ send: {
+ fn: "doSend",
+ length: 1,
+ },
+
kill: {
fn: "kill",
length: 1,
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig
index f86031caf..36e52821f 100644
--- a/src/bun.js/api/bun.zig
+++ b/src/bun.js/api/bun.zig
@@ -4426,6 +4426,7 @@ pub const EnvironmentVariables = struct {
}
return len;
}
+
pub fn getEnvValue(globalObject: *JSC.JSGlobalObject, name: ZigString) ?ZigString {
var vm = globalObject.bunVM();
var sliced = name.toSlice(vm.allocator);
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index c3244131d..50bd846ac 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -1,4 +1,3 @@
-const Bun = @This();
const default_allocator = @import("root").bun.default_allocator;
const bun = @import("root").bun;
const Environment = bun.Environment;
@@ -14,6 +13,8 @@ const JSC = @import("root").bun.JSC;
const JSValue = JSC.JSValue;
const JSGlobalObject = JSC.JSGlobalObject;
const Which = @import("../../../which.zig");
+const uws = @import("../../../deps/uws.zig");
+const IPC = @import("../../ipc.zig");
pub const Subprocess = struct {
const log = Output.scoped(.Subprocess, false);
@@ -61,15 +62,27 @@ pub const Subprocess = struct {
is_sync: bool = false,
this_jsvalue: JSC.JSValue = .zero,
+ ipc: IPCMode,
+ // this is only ever accessed when `ipc` is not `none`
+ ipc_socket: IPC.Socket = undefined,
+ ipc_callback: JSC.Strong = .{},
+ ipc_buffer: bun.ByteList,
+
pub const SignalCode = bun.SignalCode;
+ pub const IPCMode = enum {
+ none,
+ bun,
+ // json,
+ };
+
pub fn hasExited(this: *const Subprocess) bool {
return this.exit_code != null or this.waitpid_err != null or this.signal_code != null;
}
pub fn updateHasPendingActivityFlag(this: *Subprocess) void {
@fence(.SeqCst);
- this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null, .SeqCst);
+ this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null and this.ipc == .none, .SeqCst);
}
pub fn hasPendingActivity(this: *Subprocess) callconv(.C) bool {
@@ -79,7 +92,7 @@ pub const Subprocess = struct {
pub fn updateHasPendingActivity(this: *Subprocess) void {
@fence(.Release);
- this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null, .Release);
+ this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null and this.ipc == .none, .Release);
}
pub fn ref(this: *Subprocess) void {
@@ -411,6 +424,35 @@ pub const Subprocess = struct {
return JSC.JSValue.jsUndefined();
}
+ pub fn doSend(this: *Subprocess, global: *JSC.JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSValue {
+ if (this.ipc == .none) {
+ global.throw("Subprocess.send() can only be used if an IPC channel is open.", .{});
+ return .zero;
+ }
+
+ if (callFrame.argumentsCount() == 0) {
+ global.throwInvalidArguments("Subprocess.send() requires one argument", .{});
+ return .zero;
+ }
+
+ const value = callFrame.argument(0);
+
+ const success = IPC.serializeJSValueForSubprocess(
+ global,
+ value,
+ this.ipc_socket.fd(),
+ );
+ if (!success) return .zero;
+
+ return JSC.JSValue.jsUndefined();
+ }
+
+ pub fn disconnect(this: *Subprocess) void {
+ if (this.ipc == .none) return;
+ this.ipc_socket.close(0, null);
+ this.ipc = .none;
+ }
+
pub fn getPid(
this: *Subprocess,
_: *JSGlobalObject,
@@ -1070,8 +1112,11 @@ pub const Subprocess = struct {
var PATH = jsc_vm.bundler.env.get("PATH") orelse "";
var argv: std.ArrayListUnmanaged(?[*:0]const u8) = undefined;
var cmd_value = JSValue.zero;
- var detached: bool = false;
+ var detached = false;
var args = args_;
+ var ipc_mode = IPCMode.none;
+ var ipc_callback: JSValue = .zero;
+
{
if (args.isEmptyOrUndefinedOrNull()) {
globalThis.throwInvalidArguments("cmd must be an array", .{});
@@ -1164,7 +1209,11 @@ pub const Subprocess = struct {
globalThis.throwInvalidArguments("onExit must be a function or undefined", .{});
return .zero;
}
- on_exit_callback = onExit_.withAsyncContextIfNeeded(globalThis);
+
+ on_exit_callback = if (comptime is_sync)
+ onExit_
+ else
+ onExit_.withAsyncContextIfNeeded(globalThis);
}
}
@@ -1186,8 +1235,13 @@ pub const Subprocess = struct {
return .zero;
};
+ // If the env object does not include a $PATH, it must disable path lookup for argv[0]
+ PATH = "";
+
while (object_iter.next()) |key| {
var value = object_iter.value;
+ if (value == .undefined) continue;
+
var line = std.fmt.allocPrintZ(allocator, "{}={}", .{ key, value.getZigString(globalThis) }) catch {
globalThis.throw("out of memory", .{});
return .zero;
@@ -1209,7 +1263,7 @@ pub const Subprocess = struct {
if (!stdio_val.isEmptyOrUndefinedOrNull()) {
if (stdio_val.jsType().isArray()) {
var stdio_iter = stdio_val.arrayIterator(globalThis);
- stdio_iter.len = @min(stdio_iter.len, 3);
+ stdio_iter.len = @min(stdio_iter.len, 4);
var i: u32 = 0;
while (stdio_iter.next()) |value| : (i += 1) {
if (!extractStdio(globalThis, i, value, &stdio))
@@ -1250,6 +1304,15 @@ pub const Subprocess = struct {
detached = detached_val.toBoolean();
}
}
+
+ if (args.get(globalThis, "ipc")) |val| {
+ if (val.isCell() and val.isCallable(globalThis.vm())) {
+ // In the future, we should add a way to use a different IPC serialization format, specifically `json`.
+ // but the only use case this has is doing interop with node.js IPC and other programs.
+ ipc_mode = .bun;
+ ipc_callback = val.withAsyncContextIfNeeded(globalThis);
+ }
+ }
}
}
@@ -1328,6 +1391,42 @@ pub const Subprocess = struct {
return .zero;
};
+ // IPC is currently implemented in a very limited way.
+ //
+ // Node lets you pass as many fds as you want, they all become be sockets; then, IPC is just a special
+ // runtime-owned version of "pipe" (in which pipe is a misleading name since they're bidirectional sockets).
+ //
+ // Bun currently only supports three fds: stdin, stdout, and stderr, which are all unidirectional
+ //
+ // And then fd 3 is assigned specifically and only for IPC. This is quite lame, because Node.js allows
+ // the ipc fd to be any number and it just works. But most people only care about the default `.fork()`
+ // behavior, where this workaround suffices.
+ //
+ // When Bun.spawn() is given a `.onMessage` callback, it enables IPC as follows:
+ var socket: IPC.Socket = undefined;
+ if (ipc_mode != .none) {
+ if (comptime is_sync) {
+ globalThis.throwInvalidArguments("IPC is not supported in Bun.spawnSync", .{});
+ return .zero;
+ }
+
+ env_array.ensureUnusedCapacity(allocator, 2) catch |err| return globalThis.handleError(err, "in posix_spawn");
+ env_array.appendAssumeCapacity("BUN_INTERNAL_IPC_FD=3");
+
+ var fds: [2]uws.LIBUS_SOCKET_DESCRIPTOR = undefined;
+ socket = uws.newSocketFromPair(
+ jsc_vm.rareData().spawnIPCContext(jsc_vm),
+ @sizeOf(*Subprocess),
+ &fds,
+ ) orelse {
+ globalThis.throw("failed to create socket pair: E{s}", .{
+ @tagName(bun.sys.getErrno(-1)),
+ });
+ return .zero;
+ };
+ actions.dup2(fds[1], 3) catch |err| return globalThis.handleError(err, "in posix_spawn");
+ }
+
env_array.append(allocator, null) catch {
globalThis.throw("out of memory", .{});
return .zero;
@@ -1389,7 +1488,6 @@ pub const Subprocess = struct {
globalThis.throw("out of memory", .{});
return .zero;
};
-
// When run synchronously, subprocess isn't garbage collected
subprocess.* = Subprocess{
.globalThis = globalThis,
@@ -1404,7 +1502,16 @@ pub const Subprocess = struct {
.stderr = Readable.init(stdio[bun.STDERR_FD], stderr_pipe[0], jsc_vm.allocator, default_max_buffer_size),
.on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{},
.is_sync = is_sync,
+ .ipc = ipc_mode,
+ // will be assigned in the block below
+ .ipc_socket = socket,
+ .ipc_buffer = bun.ByteList{},
+ .ipc_callback = if (ipc_callback != .zero) JSC.Strong.create(ipc_callback, globalThis) else undefined,
};
+ if (ipc_mode != .none) {
+ var ptr = socket.ext(*Subprocess);
+ ptr.?.* = subprocess;
+ }
if (subprocess.stdin == .pipe) {
subprocess.stdin.pipe.signal = JSC.WebCore.Signal.init(&subprocess.stdin);
@@ -1610,6 +1717,7 @@ pub const Subprocess = struct {
globalThis: *JSC.JSGlobalObject,
this_jsvalue: JSC.JSValue,
) void {
+ log("onExit {d}, code={d}", .{ this.pid, if (this.exit_code) |e| @as(i32, @intCast(e)) else -1 });
defer this.updateHasPendingActivity();
this_jsvalue.ensureStillAlive();
this.has_waitpid_task = false;
@@ -1715,7 +1823,6 @@ pub const Subprocess = struct {
try actions.dup2(std_fileno, std_fileno);
}
},
-
.ignore => {
const flag = if (std_fileno == bun.STDIN_FD) @as(u32, os.O.RDONLY) else @as(u32, std.os.O.WRONLY);
try actions.openZ(std_fileno, "/dev/null", flag, 0o664);
@@ -1867,10 +1974,46 @@ pub const Subprocess = struct {
.held = JSC.Strong.create(array_buffer.value, globalThis),
},
};
+
return true;
}
globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'ignore', or null", .{});
return false;
}
+
+ pub fn handleIPCMessage(
+ this: *Subprocess,
+ message: IPC.DecodedIPCMessage,
+ ) void {
+ switch (message) {
+ // In future versions we can read this in order to detect version mismatches,
+ // or disable future optimizations if the subprocess is old.
+ .version => |v| {
+ IPC.log("Child IPC version is {d}", .{v});
+ },
+ .data => |data| {
+ IPC.log("Received IPC message from child", .{});
+ if (this.ipc_callback.get()) |cb| {
+ const result = cb.callWithThis(
+ this.globalThis,
+ this.this_jsvalue,
+ &[_]JSValue{data},
+ );
+ data.ensureStillAlive();
+ if (result.isAnyError()) {
+ this.globalThis.bunVM().onUnhandledError(this.globalThis, result);
+ }
+ }
+ },
+ }
+ }
+
+ pub fn handleIPCClose(this: *Subprocess, _: IPC.Socket) void {
+ // uSocket is already freed so calling .close() on the socket can segfault
+ this.ipc = .none;
+ this.updateHasPendingActivity();
+ }
+
+ pub const IPCHandler = IPC.NewIPCHandler(Subprocess);
};