diff options
Diffstat (limited to 'src/sql/postgres.zig')
-rw-r--r-- | src/sql/postgres.zig | 199 |
1 files changed, 115 insertions, 84 deletions
diff --git a/src/sql/postgres.zig b/src/sql/postgres.zig index 203324445..df6a976b6 100644 --- a/src/sql/postgres.zig +++ b/src/sql/postgres.zig @@ -3,7 +3,11 @@ const JSC = bun.JSC; const String = bun.String; const uws = bun.uws; const std = @import("std"); -const debug = bun.Output.scoped(.Postgres, true); +const debug = bun.Output.scoped(.Postgres, false); +const int32 = u32; +const PostgresInt32 = int32; +const short = u16; +const PostgresShort = u16; const Data = union(enum) { owned: bun.ByteList, temporary: []const u8, @@ -144,7 +148,7 @@ pub const protocol = struct { try pwriteFn(this.wrapped, data, i); } - pub fn @"i32"(this: @This(), value: i32) !void { + pub fn int32(this: @This(), value: PostgresInt32) !void { try this.write(std.mem.asBytes(&@byteSwap(value))); } @@ -152,7 +156,7 @@ pub const protocol = struct { try this.write(std.mem.asBytes(&@byteSwap(@as(u64, @bitCast(value))))); } - pub fn @"i16"(this: @This(), value: i16) !void { + pub fn short(this: @This(), value: PostgresShort) !void { try this.write(std.mem.asBytes(&@byteSwap(value))); } @@ -173,7 +177,7 @@ pub const protocol = struct { } pub fn @"null"(this: @This()) !void { - try this.i32(-1); + try this.int32(std.math.maxInt(PostgresInt32)); } pub fn String(this: @This(), value: bun.String) !void { @@ -393,21 +397,21 @@ pub const protocol = struct { pub fn expectInt(this: @This(), comptime Int: type, comptime value: comptime_int) !bool { var actual = try this.int(Int); - return actual != value; + return actual == value; } - pub fn @"i32"(this: @This()) !i32 { - return this.int(i32); + pub fn int32(this: @This()) !PostgresInt32 { + return this.int(PostgresInt32); } - pub fn @"i16"(this: @This()) !i16 { - return this.int(i16); + pub fn short(this: @This()) !PostgresShort { + return this.int(PostgresShort); } - pub fn length(this: @This()) !i32 { - const expected = try this.int(i32); + pub fn length(this: @This()) !PostgresInt32 { + const expected = try this.int(PostgresInt32); if (expected > -1) { - try this.ensureCapacity(@intCast(expected)); + try this.ensureCapacity(@intCast(expected -| 4)); } return expected; @@ -483,7 +487,7 @@ pub const protocol = struct { pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { const message_length = try reader.length(); - switch (try reader.i32()) { + switch (try reader.int32()) { 0 => { if (message_length != 8) return error.InvalidMessageLength; this.* = .{ .Ok = {} }; @@ -610,12 +614,12 @@ pub const protocol = struct { pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { if (!try reader.expectInt(u32, 12)) { - return error.InvalidMessage; + return error.InvalidBackendKeyData; } this.* = .{ - .process_id = @bitCast(try reader.i32()), - .secret_key = @bitCast(try reader.i32()), + .process_id = @bitCast(try reader.int32()), + .secret_key = @bitCast(try reader.int32()), }; } }; @@ -701,7 +705,7 @@ pub const protocol = struct { pub const Terminate = [_]u8{'X'} ++ toBytes(Int32(4)); fn Int32(value: anytype) [4]u8 { - return @bitCast(@byteSwap(@as(i32, @intCast(value)))); + return @bitCast(@byteSwap(@as(int32, @intCast(value)))); } const toBytes = std.mem.toBytes; @@ -742,7 +746,7 @@ pub const protocol = struct { text, binary, - pub fn from(value: i16) !FormatCode { + pub fn from(value: short) !FormatCode { return switch (value) { 0 => .text, 1 => .binary, @@ -756,24 +760,16 @@ pub const protocol = struct { var remaining_bytes = try reader.length(); remaining_bytes -|= 4; - var remaining_fields: usize = @intCast(@max(try reader.i16(), 0)); + var remaining_fields: usize = @intCast(@max(try reader.short(), 0)); for (0..remaining_fields) |index| { - const byte_length = try reader.i32(); + const byte_length = try reader.int32(); switch (byte_length) { 0 => break, else => { var bytes = try reader.bytes(@intCast(byte_length)); if (!try forEach(context, @intCast(index), &bytes)) break; }, - - -1 => { - if (!try forEach(context, @intCast(index), null)) break; - }, - - std.math.minInt(i32)...-2 => { - return error.InvalidMessageLength; - }, } } } @@ -783,9 +779,9 @@ pub const protocol = struct { pub const FieldDescription = struct { name: Data = .{ .empty = {} }, - table_oid: i32 = 0, - column_index: i16 = 0, - type_oid: i16 = 0, + table_oid: int32 = 0, + column_index: short = 0, + type_oid: short = 0, pub fn deinit(this: *@This()) void { this.name.deinit(); @@ -797,9 +793,9 @@ pub const protocol = struct { name.deinit(); } this.* = .{ - .table_oid = try reader.i32(), - .column_index = try reader.i16(), - .type_oid = @truncate(try reader.i32()), + .table_oid = try reader.int32(), + .column_index = try reader.short(), + .type_oid = @truncate(try reader.int32()), .name = .{ .owned = try name.toOwned() }, }; @@ -823,7 +819,7 @@ pub const protocol = struct { var remaining_bytes = try reader.length(); remaining_bytes -|= 4; - const field_count: usize = @intCast(@max(try reader.i16(), 0)); + const field_count: usize = @intCast(@max(try reader.short(), 0)); var fields = try bun.default_allocator.alloc( FieldDescription, field_count, @@ -849,18 +845,18 @@ pub const protocol = struct { }; pub const ParameterDescription = struct { - parameters: []i32 = &[_]i32{}, + parameters: []int32 = &[_]int32{}, pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { var remaining_bytes = try reader.length(); remaining_bytes -|= 4; - const count = try reader.i16(); - var parameters = try bun.default_allocator.alloc(i32, @intCast(@max(count, 0))); + const count = try reader.short(); + var parameters = try bun.default_allocator.alloc(int32, @intCast(@max(count, 0))); - var data = try reader.read(@as(usize, @intCast(@max(count, 0))) * @sizeOf((i32))); + var data = try reader.read(@as(usize, @intCast(@max(count, 0))) * @sizeOf((int32))); defer data.deinit(); - const input_params: []align(1) const i32 = toInt32Slice(i32, data.slice()); + const input_params: []align(1) const int32 = toInt32Slice(int32, data.slice()); for (input_params, parameters) |src, *dest| { dest.* = @byteSwap(src); } @@ -879,7 +875,7 @@ pub const protocol = struct { } pub const NotificationResponse = struct { - pid: i32 = 0, + pid: int32 = 0, channel: bun.ByteList = .{}, payload: bun.ByteList = .{}, @@ -893,7 +889,7 @@ pub const protocol = struct { std.debug.assert(length >= 4); this.* = .{ - .pid = try reader.i32(), + .pid = try reader.int32(), .channel = (try reader.readZ()).toOwned(), .payload = (try reader.readZ()).toOwned(), }; @@ -925,7 +921,7 @@ pub const protocol = struct { pub const Parse = struct { name: []const u8 = "", query: []const u8 = "", - params: []const i32 = &.{}, + params: []const int32 = &.{}, pub fn deinit(this: *Parse) void { _ = this; @@ -944,9 +940,9 @@ pub const protocol = struct { try writer.write(&header); try writer.string(this.name); try writer.string(this.query); - try writer.i16(@intCast(parameters.len)); + try writer.short(@intCast(parameters.len)); for (parameters) |parameter| { - try writer.i32(parameter); + try writer.int32(parameter); } } @@ -1082,14 +1078,18 @@ pub const protocol = struct { const database = this.database.slice(); const options = this.options.slice(); - const count: usize = @sizeOf((i32)) + @sizeOf((i32)) + user.len + 1 + database.len + 1 + options.len + 1; + const count: usize = @sizeOf((int32)) + @sizeOf((int32)) + zCount("user", user) + zCount("database", database) + zCount("client_encoding", "UTF8") + zCount("", options) + 1; const header = toBytes(Int32(@as(u32, @truncate(count)))); try writer.write(&header); - try writer.i32(196608); + try writer.int32(196608); + + try writer.string("user"); if (user.len > 0) try writer.string(user); + try writer.string("database"); + if (database.len == 0) { // The database to connect to. Defaults to the user name. try writer.string(user); @@ -1097,15 +1097,32 @@ pub const protocol = struct { try writer.string(database); } - try writer.string(options); + try writer.string("client_encoding"); + try writer.string("UTF8"); + + if (options.len > 0) + try writer.string(options); + try writer.write(&[_]u8{0}); } pub const write = writeWrap(@This(), writeInternal).write; }; + fn zCount(prefix: []const u8, slice: []const u8) usize { + if (slice.len > 0) { + return slice.len + 1 + prefix.len + 1; + } + + if (prefix.len > 0) { + return prefix.len + 1; + } + + return 0; + } + pub const Execute = struct { - max_rows: i32 = 0, + max_rows: int32 = 0, p: PortalOrPreparedStatement, pub fn writeInternal( @@ -1120,7 +1137,7 @@ pub const protocol = struct { } ++ toBytes(Int32(count)); try writer.write(&header); try writer.string(message); - try writer.i32(this.max_rows); + try writer.int32(this.max_rows); } pub const write = writeWrap(@This(), writeInternal).write; @@ -1174,7 +1191,7 @@ pub const protocol = struct { }; pub const NegotiateProtocolVersion = struct { - version: i32 = 0, + version: int32 = 0, unrecognized_options: std.ArrayListUnmanaged(String) = .{}, pub fn decodeInternal( @@ -1185,12 +1202,12 @@ pub const protocol = struct { const length = try reader.length(); std.debug.assert(length >= 4); - const version = try reader.i32(); + const version = try reader.int32(); this.* = .{ .version = version, }; - const unrecognized_options_count: u32 = @intCast(@max(try reader.i32(), 0)); + const unrecognized_options_count: u32 = @intCast(@max(try reader.int32(), 0)); try this.unrecognized_options.ensureTotalCapacity(bun.default_allocator, unrecognized_options_count); errdefer { for (this.unrecognized_options.items) |*option| { @@ -1234,7 +1251,7 @@ pub const protocol = struct { message: Data = .{ .empty = {} }, pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { - _ = try reader.i32(); + _ = try reader.int32(); const message = try reader.readZ(); this.* = .{ @@ -1287,7 +1304,7 @@ pub const protocol = struct { }; pub const types = struct { - pub const Tag = enum(i16) { + pub const Tag = enum(short) { string = 25, number = 0, json = 114, @@ -1390,7 +1407,7 @@ pub const types = struct { pub const string = struct { pub const to = 25; - pub const from = [_]i16{}; + pub const from = [_]short{}; pub fn toJSWithType( globalThis: *JSC.JSGlobalObject, @@ -1433,7 +1450,7 @@ pub const types = struct { pub const number = struct { pub const to = 0; - pub const from = [_]i16{ 21, 23, 26, 700, 701 }; + pub const from = [_]short{ 21, 23, 26, 700, 701 }; pub fn toJS( _: *JSC.JSGlobalObject, @@ -1445,7 +1462,7 @@ pub const types = struct { pub const json = struct { pub const to = 114; - pub const from = [_]i16{ 114, 3802 }; + pub const from = [_]short{ 114, 3802 }; pub fn toJS( globalObject: *JSC.JSGlobalObject, @@ -1466,7 +1483,7 @@ pub const types = struct { pub const boolean = struct { pub const to = 16; - pub const from = [_]i16{16}; + pub const from = [_]short{16}; pub fn toJS( _: *JSC.JSGlobalObject, @@ -1478,7 +1495,7 @@ pub const types = struct { pub const date = struct { pub const to = 1184; - pub const from = [_]i16{ 1082, 1114, 1184 }; + pub const from = [_]short{ 1082, 1114, 1184 }; pub fn toJS( globalObject: *JSC.JSGlobalObject, @@ -1491,7 +1508,7 @@ pub const types = struct { pub const bytea = struct { pub const to = 17; - pub const from = [_]i16{17}; + pub const from = [_]short{17}; pub fn toJS( globalObject: *JSC.JSGlobalObject, @@ -1777,18 +1794,18 @@ pub const PostgresRequest = struct { ) !void { try writer.bytes("B"); const length_offset = writer.offset(); - try writer.i32(0); + try writer.int32(0); try writer.string(name); try writer.String(cursor_name); var iter = JSC.JSArrayIterator.init(values_array, globalObject); - try writer.i16(@intCast(iter.len)); + try writer.short(@intCast(iter.len)); while (iter.next()) |value| { if (value.isUndefinedOrNull()) { - try writer.i16(0); + try writer.short(0); continue; } @@ -1796,15 +1813,15 @@ pub const PostgresRequest = struct { switch (tag) { .bytea, .number => { - try writer.i16(0); + try writer.short(0); }, else => { - try writer.i16(1); + try writer.short(1); }, } } - try writer.i16(@intCast(iter.len)); + try writer.short(@intCast(iter.len)); iter = JSC.JSArrayIterator.init(values_array, globalObject); @@ -1813,7 +1830,7 @@ pub const PostgresRequest = struct { while (iter.next()) |value| { if (value.isUndefinedOrNull()) { debug(" -> NULL", .{}); - try writer.i32(4); + try writer.int32(4); try writer.null(); continue; } @@ -1823,10 +1840,10 @@ pub const PostgresRequest = struct { .number => { debug(" -> {s}", .{@tagName(tag)}); if (value.isInt32()) { - try writer.i32(4); - try writer.i32(value.to(i32)); + try writer.int32(4); + try writer.int32(value.to(int32)); } else { - try writer.i32(8); + try writer.int32(8); try writer.f64(value.coerceToDouble(globalObject)); } }, @@ -1852,7 +1869,7 @@ pub const PostgresRequest = struct { if (value.asArrayBuffer(globalObject)) |buf| { bytes = buf.byteSlice(); } - try writer.i32(@intCast(bytes.len)); + try writer.int32(@intCast(bytes.len)); debug(" -> {s}: {d}", .{ @tagName(tag), bytes.len }); try writer.bytes(bytes); @@ -1866,13 +1883,13 @@ pub const PostgresRequest = struct { } } - try writer.pwrite(&std.mem.toBytes(@byteSwap(@as(i32, @intCast(writer.offset())))), length_offset); + try writer.pwrite(&std.mem.toBytes(@byteSwap(@as(int32, @intCast(writer.offset())))), length_offset); } pub fn writeQuery( query: []const u8, name: []const u8, - params: []const i32, + params: []const int32, comptime Context: type, writer: protocol.NewWriter(Context), ) !void { @@ -2108,6 +2125,8 @@ pub const PostgresSQLConnection = struct { socket.close(); this.fail("Failed to write startup message", err); }; + + this.flushData(); } pub fn onTimeout(this: *PostgresSQLConnection) void { @@ -2130,6 +2149,11 @@ pub const PostgresSQLConnection = struct { this.read_buffer.byte_list.len = 0; this.read_buffer.write(bun.default_allocator, data[offset..]) catch @panic("failed to write to read buffer"); } else { + if (comptime bun.Environment.allow_assert) { + if (@errorReturnTrace()) |trace| { + debug("Error: {s}\n{}", .{ @errorName(err), trace }); + } + } this.fail("Failed to read data", err); } }; @@ -2140,6 +2164,11 @@ pub const PostgresSQLConnection = struct { this.read_buffer.write(bun.default_allocator, data) catch @panic("failed to write to read buffer"); PostgresRequest.onData(this, Reader, this.bufferedReader()) catch |err| { if (err != error.ShortRead) { + if (comptime bun.Environment.allow_assert) { + if (@errorReturnTrace()) |trace| { + debug("Error: {s}\n{}", .{ @errorName(err), trace }); + } + } this.fail("Failed to read data", err); } return; @@ -2210,17 +2239,17 @@ pub const PostgresSQLConnection = struct { .statements = PreparedStatementsMap{}, }; - ptr.socket = socket: { + { const hostname = hostname_str.toUTF8(bun.default_allocator); defer hostname.deinit(); if (tls_object.isEmptyOrUndefinedOrNull()) { var ctx = vm.rareData().postgresql_context.tcp orelse brk: { var ctx_ = uws.us_create_bun_socket_context(0, vm.event_loop_handle, @sizeOf(*PostgresSQLConnection), uws.us_bun_socket_context_options_t{}).?; - uws.NewSocketHandler(false).configure(ctx_, false, PostgresSQLConnection, SocketHandler(false)); + uws.NewSocketHandler(false).configure(ctx_, true, *PostgresSQLConnection, SocketHandler(false)); vm.rareData().postgresql_context.tcp = ctx_; break :brk ctx_; }; - break :socket Socket{ + ptr.socket = .{ .SocketTCP = uws.SocketTCP.connectAnon(hostname.slice(), port, ctx, ptr) orelse { globalObject.throwError(error.ConnectionFailed, "failed to connect to postgresql"); ptr.deinit(); @@ -2233,7 +2262,9 @@ pub const PostgresSQLConnection = struct { ptr.deinit(); return null; } - }; + } + ptr.updateHasPendingActivity(); + ptr.poll_ref.ref(vm); return ptr; } @@ -2447,10 +2478,10 @@ pub const PostgresSQLConnection = struct { putDirectOffset(this.object, this.vm, index, JSC.JSValue.jsNull()); }, 2 => { - putDirectOffset(this.object, this.vm, index, JSC.JSValue.jsNumber(@as(i32, @as(i16, @bitCast(bytes[0..2].*))))); + putDirectOffset(this.object, this.vm, index, JSC.JSValue.jsNumber(@as(int32, @as(short, @bitCast(bytes[0..2].*))))); }, 4 => { - putDirectOffset(this.object, this.vm, index, JSC.JSValue.jsNumber(@as(i32, @bitCast(bytes[0..4].*)))); + putDirectOffset(this.object, this.vm, index, JSC.JSValue.jsNumber(@as(int32, @bitCast(bytes[0..4].*)))); }, else => { var eight: usize = 0; @@ -2664,7 +2695,7 @@ pub const PostgresSQLStatement = struct { cached_structure: JSC.Strong = .{}, ref_count: u32 = 1, fields: []const protocol.FieldDescription = &[_]protocol.FieldDescription{}, - parameters: []const i32 = &[_]i32{}, + parameters: []const int32 = &[_]int32{}, signature: Signature, pub fn ref(this: *@This()) void { std.debug.assert(this.ref_count > 0); @@ -2717,7 +2748,7 @@ pub const PostgresSQLStatement = struct { }; const Signature = struct { - fields: []const i32, + fields: []const int32, name: []const u8, query: []const u8, @@ -2736,7 +2767,7 @@ const Signature = struct { } pub fn generate(globalObject: *JSC.JSGlobalObject, query: []const u8, array_value: JSC.JSValue) !Signature { - var fields = std.ArrayList(i32).init(bun.default_allocator); + var fields = std.ArrayList(int32).init(bun.default_allocator); var name = try std.ArrayList(u8).initCapacity(bun.default_allocator, query.len); errdefer { @@ -2748,7 +2779,7 @@ const Signature = struct { while (iter.next()) |value| { if (value.isUndefinedOrNull()) { - try fields.append(@byteSwap(@as(i32, -1))); + try fields.append(@byteSwap(@as(int32, std.math.maxInt(int32)))); try name.appendSlice(".null"); continue; } |