aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/ipc.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/ipc.zig')
-rw-r--r--src/bun.js/ipc.zig144
1 files changed, 110 insertions, 34 deletions
diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig
index 57fcef75f..a0742a0c4 100644
--- a/src/bun.js/ipc.zig
+++ b/src/bun.js/ipc.zig
@@ -35,6 +35,11 @@ pub const IPCMessageType = enum(u8) {
_,
};
+pub const IPCBuffer = struct {
+ list: bun.ByteList = .{},
+ cursor: u32 = 0,
+};
+
/// Given potentially unfinished buffer `data`, attempt to decode and process a message from it.
/// Returns `NotEnoughBytes` if there werent enough bytes
/// Returns `InvalidFormat` if the message was invalid, probably close the socket in this case
@@ -89,12 +94,74 @@ pub fn decodeIPCMessage(
pub const Socket = uws.NewSocketHandler(false);
+pub const IPCData = struct {
+ socket: Socket,
+ incoming: bun.ByteList = .{}, // Maybe we should use IPCBuffer here as well
+ outgoing: IPCBuffer = .{},
+
+ has_written_version: if (Environment.allow_assert) u1 else u0 = 0,
+
+ pub fn writeVersionPacket(this: *IPCData) void {
+ if (Environment.allow_assert) {
+ std.debug.assert(this.has_written_version == 0);
+ }
+ const VersionPacket = extern struct {
+ type: IPCMessageType align(1) = .Version,
+ version: u32 align(1) = ipcVersion,
+ };
+ const bytes = comptime std.mem.asBytes(&VersionPacket{});
+ const n = this.socket.write(bytes, false);
+ if (n != bytes.len) {
+ var list = this.outgoing.list.listManaged(bun.default_allocator);
+ list.appendSlice(bytes) catch @panic("OOM");
+ }
+ if (Environment.allow_assert) {
+ this.has_written_version = 1;
+ }
+ }
+
+ pub fn serializeAndSend(ipc_data: *IPCData, globalThis: *JSGlobalObject, value: JSValue) bool {
+ if (Environment.allow_assert) {
+ std.debug.assert(ipc_data.has_written_version == 1);
+ }
+
+ const serialized = value.serialize(globalThis) orelse return false;
+ defer serialized.deinit();
+
+ const size: u32 = @intCast(serialized.data.len);
+
+ const payload_length: usize = @sizeOf(IPCMessageType) + @sizeOf(u32) + size;
+
+ ipc_data.outgoing.list.ensureUnusedCapacity(bun.default_allocator, payload_length) catch @panic("OOM");
+ const start_offset = ipc_data.outgoing.list.len;
+
+ ipc_data.outgoing.list.writeTypeAsBytesAssumeCapacity(u8, @intFromEnum(IPCMessageType.SerializedMessage));
+ ipc_data.outgoing.list.writeTypeAsBytesAssumeCapacity(u32, size);
+ ipc_data.outgoing.list.appendSliceAssumeCapacity(serialized.data);
+
+ std.debug.assert(ipc_data.outgoing.list.len == start_offset + payload_length);
+
+ if (start_offset == 0) {
+ std.debug.assert(ipc_data.outgoing.cursor == 0);
+
+ const n = ipc_data.socket.write(ipc_data.outgoing.list.ptr[start_offset..payload_length], false);
+ if (n == payload_length) {
+ ipc_data.outgoing.list.len = 0;
+ } else if (n > 0) {
+ ipc_data.outgoing.cursor = @intCast(n);
+ }
+ }
+
+ return true;
+ }
+};
+
/// This type is shared between VirtualMachine and Subprocess for their respective IPC handlers
///
/// `Context` must be a struct that implements this interface:
/// struct {
/// globalThis: ?*JSGlobalObject,
-/// ipc_buffer: bun.ByteList,
+/// ipc: IPCData,
///
/// fn handleIPCMessage(*Context, DecodedIPCMessage) void
/// fn handleIPCClose(*Context, Socket) void
@@ -102,18 +169,18 @@ pub const Socket = uws.NewSocketHandler(false);
pub fn NewIPCHandler(comptime Context: type) type {
return struct {
pub fn onOpen(
- _: *Context,
- socket: Socket,
+ _: *anyopaque,
+ _: Socket,
) void {
- // Write the version message
- const Data = extern struct {
- type: IPCMessageType align(1) = .Version,
- version: u32 align(1) = ipcVersion,
- };
- const data: []const u8 = comptime @as([@sizeOf(Data)]u8, @bitCast(Data{}))[0..];
- _ = socket.write(data, false);
- socket.flush();
+ // it is NOT safe to use the first argument here because it has not been initialized yet.
+ // ideally we would call .ipc.writeVersionPacket() here, and we need that to handle the
+ // theoretical write failure, but since the .ipc.outgoing buffer isn't available, that
+ // data has nowhere to go.
+ //
+ // therefore, initializers of IPC handlers need to call .ipc.writeVersionPacket() themselves
+ // this is covered by an assertion.
}
+
pub fn onClose(
this: *Context,
socket: Socket,
@@ -124,7 +191,7 @@ pub fn NewIPCHandler(comptime Context: type) type {
log("onClose\n", .{});
this.handleIPCClose(socket);
}
- // extern fn getpid() i32;
+
pub fn onData(
this: *Context,
socket: Socket,
@@ -133,10 +200,6 @@ pub fn NewIPCHandler(comptime Context: type) type {
var data = data_;
log("onData {}", .{std.fmt.fmtSliceHexLower(data)});
- // if (comptime Context == bun.JSC.VirtualMachine.IPCInstance) {
- // logDataOnly("{d} -> '{}'", .{ getpid(), std.fmt.fmtSliceHexLower(data) });
- // }
-
// In the VirtualMachine case, `globalThis` is an optional, in case
// the vm is freed before the socket closes.
var globalThis = switch (@typeInfo(@TypeOf(this.globalThis))) {
@@ -154,11 +217,11 @@ pub fn NewIPCHandler(comptime Context: type) type {
// Decode the message with just the temporary buffer, and if that
// fails (not enough bytes) then we allocate to .ipc_buffer
- if (this.ipc_buffer.len == 0) {
+ if (this.ipc.incoming.len == 0) {
while (true) {
const result = decodeIPCMessage(data, globalThis) catch |e| switch (e) {
error.NotEnoughBytes => {
- _ = this.ipc_buffer.write(bun.default_allocator, data) catch @panic("OOM");
+ _ = this.ipc.incoming.write(bun.default_allocator, data) catch @panic("OOM");
log("hit NotEnoughBytes", .{});
return;
},
@@ -180,15 +243,15 @@ pub fn NewIPCHandler(comptime Context: type) type {
}
}
- _ = this.ipc_buffer.write(bun.default_allocator, data) catch @panic("OOM");
+ _ = this.ipc.incoming.write(bun.default_allocator, data) catch @panic("OOM");
- var slice = this.ipc_buffer.slice();
+ var slice = this.ipc.incoming.slice();
while (true) {
const result = decodeIPCMessage(slice, globalThis) catch |e| switch (e) {
error.NotEnoughBytes => {
// copy the remaining bytes to the start of the buffer
- bun.copy(u8, this.ipc_buffer.ptr[0..slice.len], slice);
- this.ipc_buffer.len = @truncate(slice.len);
+ bun.copy(u8, this.ipc.incoming.ptr[0..slice.len], slice);
+ this.ipc.incoming.len = @truncate(slice.len);
log("hit NotEnoughBytes2", .{});
return;
},
@@ -206,34 +269,47 @@ pub fn NewIPCHandler(comptime Context: type) type {
slice = slice[result.bytes_consumed..];
} else {
// clear the buffer
- this.ipc_buffer.len = 0;
+ this.ipc.incoming.len = 0;
return;
}
}
}
pub fn onWritable(
- _: *Context,
- _: Socket,
- ) void {}
+ context: *Context,
+ socket: Socket,
+ ) void {
+ const to_write = context.ipc.outgoing.list.ptr[context.ipc.outgoing.cursor..context.ipc.outgoing.list.len];
+ if (to_write.len == 0) {
+ context.ipc.outgoing.cursor = 0;
+ context.ipc.outgoing.list.len = 0;
+ return;
+ }
+ const n = socket.write(to_write, false);
+ if (n == to_write.len) {
+ context.ipc.outgoing.cursor = 0;
+ context.ipc.outgoing.list.len = 0;
+ } else if (n > 0) {
+ context.ipc.outgoing.cursor += @intCast(n);
+ }
+ }
+
pub fn onTimeout(
_: *Context,
_: Socket,
) void {}
+
pub fn onConnectError(
- _: *Context,
+ _: *anyopaque,
_: Socket,
_: c_int,
- ) void {}
+ ) void {
+ // context has not been initialized
+ }
+
pub fn onEnd(
_: *Context,
_: Socket,
) void {}
};
}
-
-/// This is used for Bun.spawn() IPC because otherwise we would have to copy the data once to get it to zig, then write it.
-/// Returns `true` on success, `false` on failure + throws a JS error.
-extern fn Bun__serializeJSValueForSubprocess(global: *JSC.JSGlobalObject, value: JSValue, fd: bun.FileDescriptor) bool;
-
-pub const serializeJSValueForSubprocess = Bun__serializeJSValueForSubprocess;