diff options
Diffstat (limited to 'src/bun.js/ipc.zig')
-rw-r--r-- | src/bun.js/ipc.zig | 144 |
1 files changed, 110 insertions, 34 deletions
diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index 57fcef75f..a0742a0c4 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -35,6 +35,11 @@ pub const IPCMessageType = enum(u8) { _, }; +pub const IPCBuffer = struct { + list: bun.ByteList = .{}, + cursor: u32 = 0, +}; + /// Given potentially unfinished buffer `data`, attempt to decode and process a message from it. /// Returns `NotEnoughBytes` if there werent enough bytes /// Returns `InvalidFormat` if the message was invalid, probably close the socket in this case @@ -89,12 +94,74 @@ pub fn decodeIPCMessage( pub const Socket = uws.NewSocketHandler(false); +pub const IPCData = struct { + socket: Socket, + incoming: bun.ByteList = .{}, // Maybe we should use IPCBuffer here as well + outgoing: IPCBuffer = .{}, + + has_written_version: if (Environment.allow_assert) u1 else u0 = 0, + + pub fn writeVersionPacket(this: *IPCData) void { + if (Environment.allow_assert) { + std.debug.assert(this.has_written_version == 0); + } + const VersionPacket = extern struct { + type: IPCMessageType align(1) = .Version, + version: u32 align(1) = ipcVersion, + }; + const bytes = comptime std.mem.asBytes(&VersionPacket{}); + const n = this.socket.write(bytes, false); + if (n != bytes.len) { + var list = this.outgoing.list.listManaged(bun.default_allocator); + list.appendSlice(bytes) catch @panic("OOM"); + } + if (Environment.allow_assert) { + this.has_written_version = 1; + } + } + + pub fn serializeAndSend(ipc_data: *IPCData, globalThis: *JSGlobalObject, value: JSValue) bool { + if (Environment.allow_assert) { + std.debug.assert(ipc_data.has_written_version == 1); + } + + const serialized = value.serialize(globalThis) orelse return false; + defer serialized.deinit(); + + const size: u32 = @intCast(serialized.data.len); + + const payload_length: usize = @sizeOf(IPCMessageType) + @sizeOf(u32) + size; + + ipc_data.outgoing.list.ensureUnusedCapacity(bun.default_allocator, payload_length) catch @panic("OOM"); + const start_offset = ipc_data.outgoing.list.len; + + ipc_data.outgoing.list.writeTypeAsBytesAssumeCapacity(u8, @intFromEnum(IPCMessageType.SerializedMessage)); + ipc_data.outgoing.list.writeTypeAsBytesAssumeCapacity(u32, size); + ipc_data.outgoing.list.appendSliceAssumeCapacity(serialized.data); + + std.debug.assert(ipc_data.outgoing.list.len == start_offset + payload_length); + + if (start_offset == 0) { + std.debug.assert(ipc_data.outgoing.cursor == 0); + + const n = ipc_data.socket.write(ipc_data.outgoing.list.ptr[start_offset..payload_length], false); + if (n == payload_length) { + ipc_data.outgoing.list.len = 0; + } else if (n > 0) { + ipc_data.outgoing.cursor = @intCast(n); + } + } + + return true; + } +}; + /// This type is shared between VirtualMachine and Subprocess for their respective IPC handlers /// /// `Context` must be a struct that implements this interface: /// struct { /// globalThis: ?*JSGlobalObject, -/// ipc_buffer: bun.ByteList, +/// ipc: IPCData, /// /// fn handleIPCMessage(*Context, DecodedIPCMessage) void /// fn handleIPCClose(*Context, Socket) void @@ -102,18 +169,18 @@ pub const Socket = uws.NewSocketHandler(false); pub fn NewIPCHandler(comptime Context: type) type { return struct { pub fn onOpen( - _: *Context, - socket: Socket, + _: *anyopaque, + _: Socket, ) void { - // Write the version message - const Data = extern struct { - type: IPCMessageType align(1) = .Version, - version: u32 align(1) = ipcVersion, - }; - const data: []const u8 = comptime @as([@sizeOf(Data)]u8, @bitCast(Data{}))[0..]; - _ = socket.write(data, false); - socket.flush(); + // it is NOT safe to use the first argument here because it has not been initialized yet. + // ideally we would call .ipc.writeVersionPacket() here, and we need that to handle the + // theoretical write failure, but since the .ipc.outgoing buffer isn't available, that + // data has nowhere to go. + // + // therefore, initializers of IPC handlers need to call .ipc.writeVersionPacket() themselves + // this is covered by an assertion. } + pub fn onClose( this: *Context, socket: Socket, @@ -124,7 +191,7 @@ pub fn NewIPCHandler(comptime Context: type) type { log("onClose\n", .{}); this.handleIPCClose(socket); } - // extern fn getpid() i32; + pub fn onData( this: *Context, socket: Socket, @@ -133,10 +200,6 @@ pub fn NewIPCHandler(comptime Context: type) type { var data = data_; log("onData {}", .{std.fmt.fmtSliceHexLower(data)}); - // if (comptime Context == bun.JSC.VirtualMachine.IPCInstance) { - // logDataOnly("{d} -> '{}'", .{ getpid(), std.fmt.fmtSliceHexLower(data) }); - // } - // In the VirtualMachine case, `globalThis` is an optional, in case // the vm is freed before the socket closes. var globalThis = switch (@typeInfo(@TypeOf(this.globalThis))) { @@ -154,11 +217,11 @@ pub fn NewIPCHandler(comptime Context: type) type { // Decode the message with just the temporary buffer, and if that // fails (not enough bytes) then we allocate to .ipc_buffer - if (this.ipc_buffer.len == 0) { + if (this.ipc.incoming.len == 0) { while (true) { const result = decodeIPCMessage(data, globalThis) catch |e| switch (e) { error.NotEnoughBytes => { - _ = this.ipc_buffer.write(bun.default_allocator, data) catch @panic("OOM"); + _ = this.ipc.incoming.write(bun.default_allocator, data) catch @panic("OOM"); log("hit NotEnoughBytes", .{}); return; }, @@ -180,15 +243,15 @@ pub fn NewIPCHandler(comptime Context: type) type { } } - _ = this.ipc_buffer.write(bun.default_allocator, data) catch @panic("OOM"); + _ = this.ipc.incoming.write(bun.default_allocator, data) catch @panic("OOM"); - var slice = this.ipc_buffer.slice(); + var slice = this.ipc.incoming.slice(); while (true) { const result = decodeIPCMessage(slice, globalThis) catch |e| switch (e) { error.NotEnoughBytes => { // copy the remaining bytes to the start of the buffer - bun.copy(u8, this.ipc_buffer.ptr[0..slice.len], slice); - this.ipc_buffer.len = @truncate(slice.len); + bun.copy(u8, this.ipc.incoming.ptr[0..slice.len], slice); + this.ipc.incoming.len = @truncate(slice.len); log("hit NotEnoughBytes2", .{}); return; }, @@ -206,34 +269,47 @@ pub fn NewIPCHandler(comptime Context: type) type { slice = slice[result.bytes_consumed..]; } else { // clear the buffer - this.ipc_buffer.len = 0; + this.ipc.incoming.len = 0; return; } } } pub fn onWritable( - _: *Context, - _: Socket, - ) void {} + context: *Context, + socket: Socket, + ) void { + const to_write = context.ipc.outgoing.list.ptr[context.ipc.outgoing.cursor..context.ipc.outgoing.list.len]; + if (to_write.len == 0) { + context.ipc.outgoing.cursor = 0; + context.ipc.outgoing.list.len = 0; + return; + } + const n = socket.write(to_write, false); + if (n == to_write.len) { + context.ipc.outgoing.cursor = 0; + context.ipc.outgoing.list.len = 0; + } else if (n > 0) { + context.ipc.outgoing.cursor += @intCast(n); + } + } + pub fn onTimeout( _: *Context, _: Socket, ) void {} + pub fn onConnectError( - _: *Context, + _: *anyopaque, _: Socket, _: c_int, - ) void {} + ) void { + // context has not been initialized + } + pub fn onEnd( _: *Context, _: Socket, ) void {} }; } - -/// This is used for Bun.spawn() IPC because otherwise we would have to copy the data once to get it to zig, then write it. -/// Returns `true` on success, `false` on failure + throws a JS error. -extern fn Bun__serializeJSValueForSubprocess(global: *JSC.JSGlobalObject, value: JSValue, fd: bun.FileDescriptor) bool; - -pub const serializeJSValueForSubprocess = Bun__serializeJSValueForSubprocess; |