diff options
Diffstat (limited to 'src/sql/postgres.zig')
-rw-r--r-- | src/sql/postgres.zig | 2205 |
1 files changed, 1154 insertions, 1051 deletions
diff --git a/src/sql/postgres.zig b/src/sql/postgres.zig index a35ee7120..37bc47b21 100644 --- a/src/sql/postgres.zig +++ b/src/sql/postgres.zig @@ -3,1261 +3,1316 @@ const JSC = bun.JSC; const String = bun.String; const uws = bun.uws; const std = @import("std"); -pub const Postgres = struct { - const Data = union(enum) { - owned: bun.ByteList, - temporary: []const u8, - empty: void, +const Data = union(enum) { + owned: bun.ByteList, + temporary: []const u8, + empty: void, + + pub fn toOwned(this: @This()) !bun.ByteList { + return switch (this) { + .owned => this.owned, + .temporary => bun.ByteList.init(try bun.default_allocator.dupe(u8, this.temporary)), + .empty => bun.ByteList.init(&.{}), + }; + } - pub fn toOwned(this: @This()) !bun.ByteList { - return switch (this) { - .owned => this.owned, - .temporary => bun.ByteList.init(try bun.default_allocator.dupe(u8, this.temporary)), - .empty => bun.ByteList.init(&.{}), - }; + pub fn deinit(this: *@This()) void { + switch (this) { + .owned => this.owned.deinitWithAllocator(bun.default_allocator), + .temporary => {}, + .empty => {}, } + } - pub fn deinit(this: *@This()) void { - switch (this) { - .owned => this.owned.deinitWithAllocator(bun.default_allocator), - .temporary => {}, - .empty => {}, - } - } + pub fn slice(this: @This()) []const u8 { + return switch (this) { + .owned => this.owned.slice(), + .temporary => this.temporary, + .empty => "", + }; + } - pub fn slice(this: @This()) []const u8 { - return switch (this) { - .owned => this.owned.slice(), - .temporary => this.temporary, - .empty => "", - }; - } + pub fn sliceZ(this: @This()) [:0]const u8 { + return switch (this) { + .owned => this.owned.slice()[0..this.owned.len :0], + .temporary => this.temporary, + .empty => "", + }; + } +}; - pub fn sliceZ(this: @This()) [:0]const u8 { - return switch (this) { - .owned => this.owned.slice()[0..this.owned.len :0], - .temporary => this.temporary, - .empty => "", - }; - } - }; +pub const protocol = struct { + pub fn NewWriterWrap(comptime Context: type, comptime writeFunction_: (fn (ctx: Context, bytes: []const u8) anyerror!void)) type { + return struct { + wrapped: Context, - pub const Protocol = struct { - pub fn NewWriterWrap(comptime Context: type, comptime writeFunction_: (fn (ctx: Context, bytes: []const u8) anyerror!void)) type { - return struct { - wrapped: Context, + const writeFn = writeFunction_; + pub const Ctx = Context; - const writeFn = writeFunction_; - pub const Ctx = Context; + pub inline fn write(this: @This(), data: []const u8) anyerror!void { + try writeFn(this.wrapped, data); + } - pub inline fn write(this: @This(), data: []const u8) anyerror!void { - try writeFn(this.wrapped, data); - } + pub fn @"i32"(this: @This(), value: i32) !void { + try this.write(std.mem.asBytes(&@byteSwap(value))); + } - pub fn @"i32"(this: @This(), value: i32) !void { - try this.write(std.mem.asBytes(&@byteSwap(value))); - } + pub fn @"i16"(this: @This(), value: i16) !void { + try this.write(std.mem.asBytes(&@byteSwap(value))); + } - pub fn @"i16"(this: @This(), value: i16) !void { - try this.write(std.mem.asBytes(&@byteSwap(value))); - } + pub fn string(this: @This(), value: []const u8) !void { + try this.write(value); + if (value.len == 0 or value[value.len - 1] != 0) + try this.write(&[_]u8{0}); + } - pub fn string(this: @This(), value: []const u8) !void { - try this.write(value); - if (value.len == 0 or value[value.len - 1] != 0) - try this.write(&[_]u8{0}); - } + pub fn bytes(this: @This(), value: []const u8) !void { + try this.write(value); + if (value.len == 0 or value[value.len - 1] != 0) + try this.write(&[_]u8{0}); + } - pub fn bytes(this: @This(), value: []const u8) !void { - try this.write(value); - if (value.len == 0 or value[value.len - 1] != 0) - try this.write(&[_]u8{0}); - } + pub fn String(this: @This(), value: bun.String) !void { + var sliced = value.toUTF8(bun.default_allocator); + defer sliced.deinit(); + var slice = sliced.slice(); - pub fn String(this: @This(), value: bun.String) !void { - var sliced = value.toUTF8(bun.default_allocator); - defer sliced.deinit(); - var slice = sliced.slice(); + try this.write(slice); + if (slice.len == 0 or slice[slice.len - 1] != 0) + try this.write(&[_]u8{0}); + } + }; + } - try this.write(slice); - if (slice.len == 0 or slice[slice.len - 1] != 0) - try this.write(&[_]u8{0}); - } - }; - } + pub const FieldType = enum(u8) { + /// Severity: the field contents are ERROR, FATAL, or PANIC (in an error message), or WARNING, NOTICE, DEBUG, INFO, or LOG (in a notice message), or a localized translation of one of these. Always present. + S = 'S', - pub const FieldType = enum(u8) { - /// Severity: the field contents are ERROR, FATAL, or PANIC (in an error message), or WARNING, NOTICE, DEBUG, INFO, or LOG (in a notice message), or a localized translation of one of these. Always present. - S = 'S', + /// Severity: the field contents are ERROR, FATAL, or PANIC (in an error message), or WARNING, NOTICE, DEBUG, INFO, or LOG (in a notice message). This is identical to the S field except that the contents are never localized. This is present only in messages generated by PostgreSQL versions 9.6 and later. + V = 'V', - /// Severity: the field contents are ERROR, FATAL, or PANIC (in an error message), or WARNING, NOTICE, DEBUG, INFO, or LOG (in a notice message). This is identical to the S field except that the contents are never localized. This is present only in messages generated by PostgreSQL versions 9.6 and later. - V = 'V', + /// Code: the SQLSTATE code for the error (see Appendix A). Not localizable. Always present. + C = 'C', - /// Code: the SQLSTATE code for the error (see Appendix A). Not localizable. Always present. - C = 'C', + /// Message: the primary human-readable error message. This should be accurate but terse (typically one line). Always present. + M = 'M', - /// Message: the primary human-readable error message. This should be accurate but terse (typically one line). Always present. - M = 'M', + /// Detail: an optional secondary error message carrying more detail about the problem. Might run to multiple lines. + D = 'D', - /// Detail: an optional secondary error message carrying more detail about the problem. Might run to multiple lines. - D = 'D', + /// Hint: an optional suggestion what to do about the problem. This is intended to differ from Detail in that it offers advice (potentially inappropriate) rather than hard facts. Might run to multiple lines. + H = 'H', - /// Hint: an optional suggestion what to do about the problem. This is intended to differ from Detail in that it offers advice (potentially inappropriate) rather than hard facts. Might run to multiple lines. - H = 'H', + /// Position: the field value is a decimal ASCII integer, indicating an error cursor position as an index into the original query string. The first character has index 1, and positions are measured in characters not bytes. + P = 'P', - /// Position: the field value is a decimal ASCII integer, indicating an error cursor position as an index into the original query string. The first character has index 1, and positions are measured in characters not bytes. - P = 'P', + /// Internal position: this is defined the same as the P field, but it is used when the cursor position refers to an internally generated command rather than the one submitted by the client. The q field will always appear when this field appears. + p = 'p', - /// Internal position: this is defined the same as the P field, but it is used when the cursor position refers to an internally generated command rather than the one submitted by the client. The q field will always appear when this field appears. - p = 'p', + /// Internal query: the text of a failed internally-generated command. This could be, for example, an SQL query issued by a PL/pgSQL function. + q = 'q', - /// Internal query: the text of a failed internally-generated command. This could be, for example, an SQL query issued by a PL/pgSQL function. - q = 'q', + /// Where: an indication of the context in which the error occurred. Presently this includes a call stack traceback of active procedural language functions and internally-generated queries. The trace is one entry per line, most recent first. + W = 'W', - /// Where: an indication of the context in which the error occurred. Presently this includes a call stack traceback of active procedural language functions and internally-generated queries. The trace is one entry per line, most recent first. - W = 'W', + /// Schema name: if the error was associated with a specific database object, the name of the schema containing that object, if any. + s = 's', - /// Schema name: if the error was associated with a specific database object, the name of the schema containing that object, if any. - s = 's', + /// Table name: if the error was associated with a specific table, the name of the table. (Refer to the schema name field for the name of the table's schema.) + t = 't', - /// Table name: if the error was associated with a specific table, the name of the table. (Refer to the schema name field for the name of the table's schema.) - t = 't', + /// Column name: if the error was associated with a specific table column, the name of the column. (Refer to the schema and table name fields to identify the table.) + c = 'c', - /// Column name: if the error was associated with a specific table column, the name of the column. (Refer to the schema and table name fields to identify the table.) - c = 'c', + /// Data type name: if the error was associated with a specific data type, the name of the data type. (Refer to the schema name field for the name of the data type's schema.) + d = 'd', - /// Data type name: if the error was associated with a specific data type, the name of the data type. (Refer to the schema name field for the name of the data type's schema.) - d = 'd', + /// Constraint name: if the error was associated with a specific constraint, the name of the constraint. Refer to fields listed above for the associated table or domain. (For this purpose, indexes are treated as constraints, even if they weren't created with constraint syntax.) + n = 'n', - /// Constraint name: if the error was associated with a specific constraint, the name of the constraint. Refer to fields listed above for the associated table or domain. (For this purpose, indexes are treated as constraints, even if they weren't created with constraint syntax.) - n = 'n', + /// File: the file name of the source-code location where the error was reported. + F = 'F', - /// File: the file name of the source-code location where the error was reported. - F = 'F', + /// Line: the line number of the source-code location where the error was reported. + L = 'L', - /// Line: the line number of the source-code location where the error was reported. - L = 'L', + /// Routine: the name of the source-code routine reporting the error. + R = 'R', - /// Routine: the name of the source-code routine reporting the error. - R = 'R', + _, + }; - _, - }; + pub const FieldMessage = union(FieldType) { + S: String, + V: String, + C: String, + M: String, + D: String, + H: String, + P: String, + p: String, + q: String, + W: String, + s: String, + t: String, + c: String, + d: String, + n: String, + F: String, + L: String, + R: String, + + pub fn decodeList(comptime Context: type, reader: NewReader(Context)) !std.ArrayListUnmanaged(FieldMessage) { + var messages = std.ArrayListUnmanaged(FieldMessage){}; + while (true) { + const field_int = try reader.int(u8); + if (field_int == 0) break; + const field: FieldType = @intFromEnum(field_int); + + var message = try reader.readZ(); + defer message.deinit(); + if (message.slice().len == 0) break; + + try messages.append(bun.default_allocator, FieldMessage.init(field, message.slice()) catch continue); + } - pub const FieldMessage = union(FieldType) { - S: String, - V: String, - C: String, - M: String, - D: String, - H: String, - P: String, - p: String, - q: String, - W: String, - s: String, - t: String, - c: String, - d: String, - n: String, - F: String, - L: String, - R: String, - - pub fn decodeList(comptime Context: type, reader: NewReader(Context)) !std.ArrayListUnmanaged(FieldMessage) { - var messages = std.ArrayListUnmanaged(FieldMessage){}; - while (true) { - const field_int = try reader.int(u8); - if (field_int == 0) break; - const field: FieldType = @intFromEnum(field_int); - - var message = try reader.readZ(); - defer message.deinit(); - if (message.slice().len == 0) break; - - try messages.append(bun.default_allocator, FieldMessage.init(field, message.slice()) catch continue); - } + return messages; + } + + pub fn init(tag: FieldType, message: []const u8) !FieldMessage { + return switch (tag) { + .S => String.create(message), + .V => String.create(message), + .C => String.create(message), + .M => String.create(message), + .D => String.create(message), + .H => String.create(message), + .P => String.create(message), + .p => String.create(message), + .q => String.create(message), + .W => String.create(message), + .s => String.create(message), + .t => String.create(message), + .c => String.create(message), + .d => String.create(message), + .n => String.create(message), + .F => String.create(message), + .L => String.create(message), + .R => String.create(message), + else => error.UnknownFieldType, + }; + } + }; - return messages; + pub fn NewReaderWrap( + comptime Context: type, + comptime ensureCapacityFn_: (fn (ctx: Context, count: usize) bool), + comptime readFunction_: (fn (ctx: Context, count: usize) anyerror!Data), + comptime readZ_: (fn (ctx: Context, count: usize) anyerror!Data), + ) type { + return struct { + wrapped: Context, + const readFn = readFunction_; + const readZFn = readZ_; + const ensureCapacityFn = ensureCapacityFn_; + + pub const Ctx = Context; + + pub inline fn read(this: @This(), count: usize) anyerror!Data { + return try readFn(this.wrapped, count); } - pub fn init(tag: FieldType, message: []const u8) !FieldMessage { - return switch (tag) { - .S => String.create(message), - .V => String.create(message), - .C => String.create(message), - .M => String.create(message), - .D => String.create(message), - .H => String.create(message), - .P => String.create(message), - .p => String.create(message), - .q => String.create(message), - .W => String.create(message), - .s => String.create(message), - .t => String.create(message), - .c => String.create(message), - .d => String.create(message), - .n => String.create(message), - .F => String.create(message), - .L => String.create(message), - .R => String.create(message), - else => error.UnknownFieldType, - }; + pub inline fn readZ(this: @This()) anyerror!Data { + return try readZFn(this.wrapped); } - }; - pub fn NewReaderWrap( - comptime Context: type, - comptime ensureCapacityFn_: (fn (ctx: Context, count: usize) bool), - comptime readFunction_: (fn (ctx: Context, count: usize) anyerror!Data), - comptime readZ_: (fn (ctx: Context, count: usize) anyerror!Data), - ) type { - return struct { - wrapped: Context, - const readFn = readFunction_; - const readZFn = readZ_; - const ensureCapacityFn = ensureCapacityFn_; - - pub const Ctx = Context; - - pub inline fn read(this: @This(), count: usize) anyerror!Data { - return try readFn(this.wrapped, count); + pub inline fn ensureCapacity(this: @This(), count: usize) anyerror!void { + if (!ensureCapacityFn(this.wrapped, count)) { + return error.ShortRead; } + } - pub inline fn readZ(this: @This()) anyerror!Data { - return try readZFn(this.wrapped); - } + pub fn int(this: @This(), comptime Int: type) !Int { + var data = try this.read(@sizeOf((Int))); + defer data.deinit(); + return @byteSwap(@as(Int, data.slice()[0..@sizeOf(Int)].*)); + } - pub inline fn ensureCapacity(this: @This(), count: usize) anyerror!void { - if (!ensureCapacityFn(this.wrapped, count)) { - return error.ShortRead; - } - } + pub fn expectInt(this: @This(), comptime Int: type, comptime value: comptime_int) !bool { + var actual = try this.int(Int); + return actual != value; + } - pub fn int(this: @This(), comptime Int: type) !Int { - var data = try this.read(@sizeOf((Int))); - defer data.deinit(); - return @byteSwap(@as(Int, data.slice()[0..@sizeOf(Int)].*)); - } + pub fn @"i32"(this: @This()) !i32 { + return this.int(i32); + } - pub fn expectInt(this: @This(), comptime Int: type, comptime value: comptime_int) !bool { - var actual = try this.int(Int); - return actual != value; - } + pub fn @"i16"(this: @This()) !i16 { + return this.int(i16); + } - pub fn @"i32"(this: @This()) !i32 { - return this.int(i32); + pub fn length(this: @This()) !i32 { + const expected = try this.int(i32); + if (expected > -1) { + try this.ensureCapacity(@intCast(expected)); } - pub fn @"i16"(this: @This()) !i16 { - return this.int(i16); - } + return expected; + } - pub fn length(this: @This()) !i32 { - const expected = try this.int(i32); - if (expected > -1) { - try this.ensureCapacity(@intCast(expected)); - } + pub const bytes = read; - return expected; - } + pub fn String(this: @This()) !bun.String { + var result = try this.readZ(); + defer result.deinit(); + return bun.String.fromUTF8(result.slice()); + } + }; + } - pub const bytes = read; + pub fn NewReader(comptime Context: type) type { + return NewReaderWrap(Context, Context.ensureReadCapacity, Context.readData, Context.readDataZ); + } - pub fn String(this: @This()) !bun.String { - var result = try this.readZ(); - defer result.deinit(); - return bun.String.fromUTF8(result.slice()); - } - }; - } + pub fn NewWriter(comptime Context: type) type { + return NewWriterWrap(Context, Context.writeData); + } - pub fn NewReader(comptime Context: type) type { - return NewReaderWrap(Context, Context.ensureReadCapacity, Context.readData, Context.readDataZ); + comptime { + if (@import("builtin").cpu.arch.endian() != .little) { + @compileError("Postgres protocol implementation assumes little endian"); } + } - pub fn NewWriter(comptime Context: type) type { - return NewWriterWrap(Context, Context.writeData); - } + fn decoderWrap(comptime Container: type, comptime decodeFn: anytype) type { + return struct { + pub fn decode(this: *Container, context: anytype) anyerror!void { + const Context = @TypeOf(context); + try decodeFn(this, Context, NewReader(Context){ .wrapped = context }); + } + }; + } - comptime { - if (@import("builtin").cpu.arch.endian() != .little) { - @compileError("Postgres protocol implementation assumes little endian"); + fn writeWrap(comptime Container: type, comptime writeFn: anytype) type { + return struct { + pub fn write(this: *Container, context: anytype) anyerror!void { + const Context = @TypeOf(context); + try writeFn(this, Context, NewWriter(Context){ .wrapped = context }); } - } + }; + } - fn decoderWrap(comptime Container: type, comptime decodeFn: anytype) type { - return struct { - pub fn decode(this: *Container, context: anytype) anyerror!void { - const Context = @TypeOf(context); - try decodeFn(this, Context, NewReader(Context){ .wrapped = context }); - } - }; - } + pub const Authentication = union(enum) { + Ok: void, + ClearTextPassword: struct {}, + MD5Password: struct { + salt: [4]u8, + }, + KerberosV5: struct {}, + SCMCredential: struct {}, + GSS: struct {}, + GSSContinue: struct { + data: Data, + }, + SSPI: struct {}, + SASL: struct { + mechanisms: Data, + data: Data, + }, + SASLContinue: struct { + data: Data, + }, + SASLFinal: struct { + data: Data, + }, + Unknown: void, + + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + const message_length = try reader.length(); + + switch (try reader.i32()) { + 0 => { + if (message_length != 8) return error.InvalidMessageLength; + this.* = .{ .Ok = {} }; + }, + 2 => { + if (message_length != 8) return error.InvalidMessageLength; + this.* = .{ + .KerberosV5 = .{}, + }; + }, + 3 => { + if (message_length != 8) return error.InvalidMessageLength; + this.* = .{ + .ClearTextPassword = .{}, + }; + }, + 5 => { + if (message_length != 12) return error.InvalidMessageLength; + try reader.expectInt(u32, 5); + var salt_data = try reader.bytes(4); + defer salt_data.deinit(); + this.* = .{ + .MD5Password = .{ + .salt = salt_data.slice()[0..4].*, + }, + }; + }, + 7 => { + if (message_length != 8) return error.InvalidMessageLength; + this.* = .{ + .GSS = .{}, + }; + }, - fn writeWrap(comptime Container: type, comptime writeFn: anytype) type { - return struct { - pub fn write(this: *Container, context: anytype) anyerror!void { - const Context = @TypeOf(context); - try writeFn(this, Context, NewWriter(Context){ .wrapped = context }); - } - }; - } + 8 => { + if (message_length < 9) return error.InvalidMessageLength; + const remaining = message_length -| (8 - 1); + const bytes = try reader.read(remaining); + this.* = .{ + .GSSContinue = .{ + .data = bytes, + }, + }; + }, + 9 => { + if (message_length != 8) return error.InvalidMessageLength; + this.* = .{ + .SSPI = .{}, + }; + }, - pub const Authentication = union(enum) { - Ok: void, - ClearTextPassword: struct {}, - MD5Password: struct { - salt: [4]u8, - }, - KerberosV5: struct {}, - SCMCredential: struct {}, - GSS: struct {}, - GSSContinue: struct { - data: Data, - }, - SSPI: struct {}, - SASL: struct { - mechanisms: Data, - data: Data, - }, - SASLContinue: struct { - data: Data, - }, - SASLFinal: struct { - data: Data, - }, - Unknown: void, - - pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { - const message_length = try reader.length(); - - switch (try reader.i32()) { - 0 => { - if (message_length != 8) return error.InvalidMessageLength; - this.* = .{ .Ok = {} }; - }, - 2 => { - if (message_length != 8) return error.InvalidMessageLength; - this.* = .{ - .KerberosV5 = .{}, - }; - }, - 3 => { - if (message_length != 8) return error.InvalidMessageLength; - this.* = .{ - .ClearTextPassword = .{}, - }; - }, - 5 => { - if (message_length != 12) return error.InvalidMessageLength; - try reader.expectInt(u32, 5); - var salt_data = try reader.bytes(4); - defer salt_data.deinit(); - this.* = .{ - .MD5Password = .{ - .salt = salt_data.slice()[0..4].*, - }, - }; - }, - 7 => { - if (message_length != 8) return error.InvalidMessageLength; - this.* = .{ - .GSS = .{}, - }; - }, + 10 => { + if (message_length < 9) return error.InvalidMessageLength; + const remaining = message_length -| (8 - 1); + const bytes = try reader.read(remaining); + this.* = .{ + .SASL = .{ + .mechanisms = bytes, + }, + }; + }, - 8 => { - if (message_length < 9) return error.InvalidMessageLength; - const remaining = message_length -| (8 - 1); - const bytes = try reader.read(remaining); - this.* = .{ - .GSSContinue = .{ - .data = bytes, - }, - }; - }, - 9 => { - if (message_length != 8) return error.InvalidMessageLength; - this.* = .{ - .SSPI = .{}, - }; - }, + 11 => { + if (message_length < 9) return error.InvalidMessageLength; + const remaining = message_length -| (8 - 1); + const bytes = try reader.read(remaining); + this.* = .{ + .SASLContinue = .{ + .data = bytes, + }, + }; + }, - 10 => { - if (message_length < 9) return error.InvalidMessageLength; - const remaining = message_length -| (8 - 1); - const bytes = try reader.read(remaining); - this.* = .{ - .SASL = .{ - .mechanisms = bytes, - }, - }; - }, + 12 => { + if (message_length < 9) return error.InvalidMessageLength; + const remaining = message_length -| (8 - 1); + const bytes = try reader.read(remaining); + this.* = .{ + .SASLFinal = .{ + .data = bytes, + }, + }; + }, - 11 => { - if (message_length < 9) return error.InvalidMessageLength; - const remaining = message_length -| (8 - 1); - const bytes = try reader.read(remaining); - this.* = .{ - .SASLContinue = .{ - .data = bytes, - }, - }; - }, + else => { + this.* = .{ .Unknown = {} }; + }, + } + } - 12 => { - if (message_length < 9) return error.InvalidMessageLength; - const remaining = message_length -| (8 - 1); - const bytes = try reader.read(remaining); - this.* = .{ - .SASLFinal = .{ - .data = bytes, - }, - }; - }, + pub const decode = decoderWrap(Authentication, decodeInternal).decode; + }; - else => { - this.* = .{ .Unknown = {} }; - }, - } - } + pub const BackendKeyData = struct { + process_id: u32 = 0, + secret_key: u32 = 0, + pub const decode = decoderWrap(BackendKeyData, decodeInternal).decode; - pub const decode = decoderWrap(Authentication, decodeInternal).decode; - }; + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + try reader.expectInt(u32, 12); - pub const BackendKeyData = struct { - process_id: u32 = 0, - secret_key: u32 = 0, - pub const decode = decoderWrap(BackendKeyData, decodeInternal).decode; + this.* = .{ + .process_id = try reader.i32(), + .secret_key = try reader.i32(), + }; + } + }; - pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { - try reader.expectInt(u32, 12); + pub const ErrorResponse = struct { + messages: std.ArrayListUnmanaged(FieldMessage) = .{}, + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + var remaining_bytes = try reader.length(); + if (remaining_bytes < 4) return error.InvalidMessageLength; + remaining_bytes -|= 4; + + if (remaining_bytes > 0) { this.* = .{ - .process_id = try reader.i32(), - .secret_key = try reader.i32(), + .messages = try FieldMessage.decodeList(Container, reader), }; } - }; + } - pub const ErrorResponse = struct { - messages: std.ArrayListUnmanaged(FieldMessage) = .{}, + pub const decode = decoderWrap(ErrorResponse, decodeInternal).decode; + }; - pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { - var remaining_bytes = try reader.length(); - if (remaining_bytes < 4) return error.InvalidMessageLength; - remaining_bytes -|= 4; + pub const PortalOrPreparedStatement = union(enum) { + portal: []const u8, + prepared_statement: []const u8, - if (remaining_bytes > 0) { - this.* = .{ - .messages = try FieldMessage.decodeList(Container, reader), - }; - } - } + pub fn slice(this: @This()) []const u8 { + return switch (this) { + .portal => this.portal, + .prepared_statement => this.prepared_statement, + }; + } - pub const decode = decoderWrap(ErrorResponse, decodeInternal).decode; - }; + pub fn tag(this: @This()) u8 { + return switch (this) { + .portal => 'P', + .prepared_statement => 'S', + }; + } + }; - pub const PortalOrPreparedStatement = union(enum) { - portal: []const u8, - prepared_statement: []const u8, + /// Close (F) + /// Byte1('C') + /// - Identifies the message as a Close command. + /// Int32 + /// - Length of message contents in bytes, including self. + /// Byte1 + /// - 'S' to close a prepared statement; or 'P' to close a portal. + /// String + /// - The name of the prepared statement or portal to close (an empty string selects the unnamed prepared statement or portal). + pub const Close = struct { + p: PortalOrPreparedStatement, + + fn writeInternal( + this: *const @This(), + comptime Context: type, + writer: NewWriter(Context), + ) !void { + const p = this.p; + const count: u32 = @sizeOf((u32)) + 1 + p.slice().len + 1; + const header = [_]u8{ + 'C', + } ++ @byteSwap(count) ++ [_]u8{ + p.tag(), + }; + try writer.write(&header); + try writer.write(p.slice()); + try writer.write(&[_]u8{0}); + } - pub fn slice(this: @This()) []const u8 { - return switch (this) { - .portal => this.portal, - .prepared_statement => this.prepared_statement, - }; - } + pub const write = writeWrap(@This(), writeInternal); + }; - pub fn tag(this: @This()) u8 { - return switch (this) { - .portal => 'P', - .prepared_statement => 'S', - }; - } - }; + pub const CloseComplete = [_]u8{'3'} ++ toBytes(Int32(4)); + pub const EmptyQueryResponse = [_]u8{'I'} ++ toBytes(Int32(4)); + pub const Terminate = [_]u8{'X'} ++ toBytes(Int32(4)); - /// Close (F) - /// Byte1('C') - /// - Identifies the message as a Close command. - /// Int32 - /// - Length of message contents in bytes, including self. - /// Byte1 - /// - 'S' to close a prepared statement; or 'P' to close a portal. - /// String - /// - The name of the prepared statement or portal to close (an empty string selects the unnamed prepared statement or portal). - pub const Close = struct { - p: PortalOrPreparedStatement, - - fn writeInternal( - this: *const @This(), - comptime Context: type, - writer: NewWriter(Context), - ) !void { - const p = this.p; - const count: u32 = @sizeOf((u32)) + 1 + p.slice().len + 1; - const header = [_]u8{ - 'C', - } ++ @byteSwap(count) ++ [_]u8{ - p.tag(), - }; - try writer.write(&header); - try writer.write(p.slice()); - try writer.write(&[_]u8{0}); - } + fn Int32(value: anytype) [4]u8 { + return @byteSwap(@as(i32, @intCast(value))); + } - pub const write = writeWrap(@This(), writeInternal); - }; + const toBytes = std.mem.toBytes; - pub const CloseComplete = [_]u8{'3'} ++ toBytes(Int32(4)); - pub const EmptyQueryResponse = [_]u8{'I'} ++ toBytes(Int32(4)); - pub const Terminate = [_]u8{'X'} ++ toBytes(Int32(4)); + pub const TransactionStatusIndicator = enum(u8) { + /// if idle (not in a transaction block) + I = 'I', - fn Int32(value: anytype) [4]u8 { - return @byteSwap(@as(i32, @intCast(value))); - } + /// if in a transaction block + T = 'T', - const toBytes = std.mem.toBytes; + /// if in a failed transaction block + E = 'E', - pub const TransactionStatusIndicator = enum(u8) { - /// if idle (not in a transaction block) - I = 'I', + _, + }; - /// if in a transaction block - T = 'T', + pub const ReadyForQuery = struct { + status: TransactionStatusIndicator = .I, + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + const length = try reader.length(); + std.debug.assert(length >= 4); - /// if in a failed transaction block - E = 'E', + if (length != 5) { + return error.InvalidMessageLength; + } - _, - }; + const status = try reader.bytes(1); + this.* = .{ + .status = @enumFromInt(status[0]), + }; + } - pub const ReadyForQuery = struct { - status: TransactionStatusIndicator = .I, - pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { - const length = try reader.length(); - std.debug.assert(length >= 4); + pub const decode = decoderWrap(ReadyForQuery, decodeInternal).decode; + }; - if (length != 5) { - return error.InvalidMessageLength; - } + pub const FormatCode = enum { + text, + binary, - const status = try reader.bytes(1); - this.* = .{ - .status = @enumFromInt(status[0]), - }; - } + pub fn from(value: i16) !FormatCode { + return switch (value) { + 0 => .text, + 1 => .binary, + else => error.UnknownFormatCode, + }; + } + }; - pub const decode = decoderWrap(ReadyForQuery, decodeInternal).decode; - }; + pub const DataRow = struct { + pub fn decode(context: anytype, reader: NewReader(@TypeOf(context)), comptime forEach: fn (@TypeOf(context), index: i16, bytes: ?*Data) anyerror!bool) anyerror!void { + var remaining_bytes = try reader.length(); + remaining_bytes -|= 4; - pub const FormatCode = enum { - text, - binary, + var remaining_fields = try reader.i16(); - pub fn from(value: i16) !FormatCode { - return switch (value) { - 0 => .text, - 1 => .binary, - else => error.UnknownFormatCode, - }; - } - }; + for (0..remaining_fields) |index| { + const byte_length = try reader.i32(); + switch (byte_length) { + 0 => break, + else => { + var bytes = try reader.bytes(@intCast(byte_length)); + if (!try forEach(context, index, &bytes)) break; + }, + + -1 => { + if (!try forEach(context, index, null)) break; + }, - pub const DataRow = struct { - pub fn decode(context: anytype, reader: NewReader(@TypeOf(context)), comptime forEach: fn (@TypeOf(context), index: i16, bytes: ?*Data) anyerror!bool) anyerror!void { - var remaining_bytes = try reader.length(); - remaining_bytes -|= 4; + std.math.minInt(i32)...-1 => { + return error.InvalidMessageLength; + }, + } + } + } + }; - var remaining_fields = try reader.i16(); + pub const BindComplete = [_]u8{'2'} ++ toBytes(Int32(4)); - for (0..remaining_fields) |index| { - const byte_length = try reader.i32(); - switch (byte_length) { - 0 => break, - else => { - var bytes = try reader.bytes(@intCast(byte_length)); - if (!try forEach(context, index, &bytes)) break; - }, + pub const FieldDescription = struct { + name: Data = .{ .empty = {} }, + table_oid: i32 = 0, + column_index: i16 = 0, + type_oid: i32 = 0, + type_size: i16 = 0, + type_modifier: i32 = 0, + format_code: FormatCode = FormatCode.binary, - -1 => { - if (!try forEach(context, index, null)) break; - }, + pub fn deinit(this: *@This()) void { + this.name.deinit(); + } - std.math.minInt(i32)...-1 => { - return error.InvalidMessageLength; - }, - } - } + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + const name = try reader.readZ(); + errdefer { + name.deinit(); } - }; + this.* = .{ + .table_oid = try reader.i32(), + .column_index = try reader.i16(), + .type_oid = try reader.i32(), + .type_size = try reader.i16(), + .type_modifier = try reader.i32(), + .format_code = try FormatCode.from(try reader.i16()), + .name = try name.toOwned(), + }; + } - pub const BindComplete = [_]u8{'2'} ++ toBytes(Int32(4)); + pub const decode = decoderWrap(FieldDescription, decodeInternal).decode; + }; - pub const FieldDescription = struct { - name: Data = .{ .empty = {} }, - table_oid: i32 = 0, - column_index: i16 = 0, - type_oid: i32 = 0, - type_size: i16 = 0, - type_modifier: i32 = 0, - format_code: FormatCode = FormatCode.binary, + pub const RowDescription = struct { + fields: []FieldDescription = &[_]FieldDescription{}, - pub fn deinit(this: *@This()) void { - this.name.deinit(); - } + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + var remaining_bytes = try reader.length(); + remaining_bytes -|= 4; - pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { - const name = try reader.readZ(); - errdefer { - name.deinit(); + const field_count = try reader.i16(); + var fields = try bun.default_allocator.alloc(FieldDescription, field_count); + var remaining = fields; + errdefer { + for (fields[0 .. field_count - remaining.len]) |*field| { + field.deinit(); } - this.* = .{ - .table_oid = try reader.i32(), - .column_index = try reader.i16(), - .type_oid = try reader.i32(), - .type_size = try reader.i16(), - .type_modifier = try reader.i32(), - .format_code = try FormatCode.from(try reader.i16()), - .name = try name.toOwned(), - }; + + bun.default_allocator.free(fields); + } + while (remaining.len > 0) { + remaining[0] = try FieldDescription.decodeInternal(Container, reader); + remaining = remaining[1..]; } + this.* = .{ + .fields = fields, + }; + } - pub const decode = decoderWrap(FieldDescription, decodeInternal).decode; - }; + pub const decode = decoderWrap(RowDescription, decodeInternal).decode; + }; - pub const RowDescription = struct { - fields: []FieldDescription = &[_]FieldDescription{}, + pub const ParameterDescription = struct { + parameters: []i32 = &[_]i32{}, - pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { - var remaining_bytes = try reader.length(); - remaining_bytes -|= 4; + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + var remaining_bytes = try reader.length(); + remaining_bytes -|= 4; - const field_count = try reader.i16(); - var fields = try bun.default_allocator.alloc(FieldDescription, field_count); - var remaining = fields; - errdefer { - for (fields[0 .. field_count - remaining.len]) |*field| { - field.deinit(); - } + const count = try reader.i16(); + var parameters = try bun.default_allocator.alloc(i32, count); - bun.default_allocator.free(fields); - } - while (remaining.len > 0) { - remaining[0] = try FieldDescription.decodeInternal(Container, reader); - remaining = remaining[1..]; - } - this.* = .{ - .fields = fields, - }; + var data = try reader.read(count * @sizeOf((i32))); + defer data.deinit(); + const input_params: []align(1) const i32 = @ptrCast(data.slice()); + for (input_params, parameters) |src, *dest| { + dest.* = @byteSwap(src); } - pub const decode = decoderWrap(RowDescription, decodeInternal).decode; - }; + this.* = .{ + .parameters = parameters, + }; + } - pub const CommandComplete = struct { - command_tag: Data = .{ .empty = {} }, + pub const decode = decoderWrap(ParameterDescription, decodeInternal).decode; + }; - pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { - const length = try reader.length(); - std.debug.assert(length >= 4); + pub const NotificationResponse = struct { + pid: i32 = 0, + channel: bun.ByteList = .{}, + payload: bun.ByteList = .{}, - const tag = try reader.readZ(); - this.* = .{ - .command_tag = tag, - }; - } + pub fn deinit(this: *@This()) void { + this.channel.deinitWithAllocator(bun.default_allocator); + this.payload.deinitWithAllocator(bun.default_allocator); + } - pub const decode = decoderWrap(CommandComplete, decodeInternal).decode; - }; + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + const length = try reader.length(); + std.debug.assert(length >= 4); - pub const Parse = struct { - name: bun.String = bun.String.empty, - query: bun.String = bun.String.empty, + this.* = .{ + .pid = try reader.i32(), + .channel = (try reader.readZ()).toOwned(), + .payload = (try reader.readZ()).toOwned(), + }; + } - /// Object IDs - paramters: std.ArrayListUnmanaged(i32) = .{}, + pub const decode = decoderWrap(NotificationResponse, decodeInternal).decode; + }; - pub fn deinit(this: *Parse) void { - this.name.deref(); - this.query.deref(); - this.paramters.deinit(bun.default_allocator); - } + pub const CommandComplete = struct { + command_tag: Data = .{ .empty = {} }, - pub fn writeInternal( - this: *const @This(), - comptime Context: type, - writer: NewWriter(Context), - ) !void { - const parameters = this.paramters.items; - const count: usize = @sizeOf((u32)) + @sizeOf(u16) + (parameters.len * @sizeOf(u32)); - const header = [_]u8{ - 'P', - } ++ toBytes(Int32(count)); - try writer.write(&header); - try writer.string(this.name); - try writer.string(this.query); - try writer.i16(@truncate(parameters.len)); - for (parameters) |parameter| { - try writer.i32(parameter); - } - } + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + const length = try reader.length(); + std.debug.assert(length >= 4); - pub const write = writeWrap(@This(), writeInternal).write; - }; + const tag = try reader.readZ(); + this.* = .{ + .command_tag = tag, + }; + } - pub const ParseComplete = [_]u8{'1'} ++ toBytes(Int32(4)); + pub const decode = decoderWrap(CommandComplete, decodeInternal).decode; + }; - pub const PasswordMessage = struct { - password: Data = .{ .empty = {} }, + pub const Parse = struct { + name: bun.String = bun.String.empty, + query: bun.String = bun.String.empty, - pub fn deinit(this: *PasswordMessage) void { - this.password.deinit(); - } + /// Object IDs + paramters: std.ArrayListUnmanaged(i32) = .{}, - pub fn writeInternal( - this: *const @This(), - comptime Context: type, - writer: NewWriter(Context), - ) !void { - const password = this.password.slice(); - const count: usize = @sizeOf((u32)) + password.len + 1; - const header = [_]u8{ - 'p', - } ++ toBytes(Int32(count)); - try writer.write(&header); - try writer.string(password); - } + pub fn deinit(this: *Parse) void { + this.name.deref(); + this.query.deref(); + this.paramters.deinit(bun.default_allocator); + } - pub const write = writeWrap(@This(), writeInternal).write; - }; + pub fn writeInternal( + this: *const @This(), + comptime Context: type, + writer: NewWriter(Context), + ) !void { + const parameters = this.paramters.items; + const count: usize = @sizeOf((u32)) + @sizeOf(u16) + (parameters.len * @sizeOf(u32)); + const header = [_]u8{ + 'P', + } ++ toBytes(Int32(count)); + try writer.write(&header); + try writer.string(this.name); + try writer.string(this.query); + try writer.i16(@truncate(parameters.len)); + for (parameters) |parameter| { + try writer.i32(parameter); + } + } - pub const CopyData = struct { - data: Data = .{ .empty = {} }, + pub const write = writeWrap(@This(), writeInternal).write; + }; - pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { - const length = try reader.length(); + pub const ParseComplete = [_]u8{'1'} ++ toBytes(Int32(4)); - const data = try reader.read(length -| 5); - this.* = .{ - .data = data, - }; - } + pub const PasswordMessage = struct { + password: Data = .{ .empty = {} }, - pub const decode = decoderWrap(CopyData, decodeInternal).decode; - - pub fn writeInternal( - this: *const @This(), - comptime Context: type, - writer: NewWriter(Context), - ) !void { - const data = this.data.slice(); - const count: u32 = @sizeOf((u32)) + data.len + 1; - const header = [_]u8{ - 'd', - } ++ toBytes(Int32(count)); - try writer.write(&header); - try writer.string(data); - } + pub fn deinit(this: *PasswordMessage) void { + this.password.deinit(); + } - pub const write = writeWrap(@This(), writeInternal).write; - }; + pub fn writeInternal( + this: *const @This(), + comptime Context: type, + writer: NewWriter(Context), + ) !void { + const password = this.password.slice(); + const count: usize = @sizeOf((u32)) + password.len + 1; + const header = [_]u8{ + 'p', + } ++ toBytes(Int32(count)); + try writer.write(&header); + try writer.string(password); + } - pub const CopyDone = [_]u8{'c'} ++ toBytes(Int32(4)); - pub const Sync = [_]u8{'S'} ++ toBytes(Int32(4)); - pub const Flush = [_]u8{'H'} ++ toBytes(Int32(4)); - pub const SSLRequest = toBytes(Int32(8)) ++ toBytes(Int32(80877103)); - pub const NoData = [_]u8{'n'} ++ toBytes(Int32(4)); + pub const write = writeWrap(@This(), writeInternal).write; + }; - pub const SASLInitialResponse = struct { - mechanism: Data = .{ .empty = {} }, - data: Data = .{ .empty = {} }, + pub const CopyData = struct { + data: Data = .{ .empty = {} }, - pub fn deinit(this: *SASLInitialResponse) void { - this.mechanism.deinit(); - this.data.deinit(); - } + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + const length = try reader.length(); - pub fn writeInternal( - this: *const @This(), - comptime Context: type, - writer: NewWriter(Context), - ) !void { - const mechanism = this.mechanism.slice(); - const data = this.data.slice(); - const count: usize = @sizeOf((u32)) + mechanism.len + 1 + data.len + 1; - const header = [_]u8{ - 'p', - } ++ toBytes(Int32(count)); - try writer.write(&header); - try writer.string(mechanism); - try writer.string(data); - } + const data = try reader.read(length -| 5); + this.* = .{ + .data = data, + }; + } - pub const write = writeWrap(@This(), writeInternal).write; - }; + pub const decode = decoderWrap(CopyData, decodeInternal).decode; - pub const SASLResponse = struct { - data: Data = .{ .empty = {} }, + pub fn writeInternal( + this: *const @This(), + comptime Context: type, + writer: NewWriter(Context), + ) !void { + const data = this.data.slice(); + const count: u32 = @sizeOf((u32)) + data.len + 1; + const header = [_]u8{ + 'd', + } ++ toBytes(Int32(count)); + try writer.write(&header); + try writer.string(data); + } - pub fn deinit(this: *SASLResponse) void { - this.data.deinit(); - } + pub const write = writeWrap(@This(), writeInternal).write; + }; - pub fn writeInternal( - this: *const @This(), - comptime Context: type, - writer: NewWriter(Context), - ) !void { - const data = this.data.slice(); - const count: usize = @sizeOf((u32)) + data.len + 1; - const header = [_]u8{ - 'p', - } ++ toBytes(Int32(count)); - try writer.write(&header); - try writer.string(data); - } + pub const CopyDone = [_]u8{'c'} ++ toBytes(Int32(4)); + pub const Sync = [_]u8{'S'} ++ toBytes(Int32(4)); + pub const Flush = [_]u8{'H'} ++ toBytes(Int32(4)); + pub const SSLRequest = toBytes(Int32(8)) ++ toBytes(Int32(80877103)); + pub const NoData = [_]u8{'n'} ++ toBytes(Int32(4)); - pub const write = writeWrap(@This(), writeInternal).write; - }; + pub const SASLInitialResponse = struct { + mechanism: Data = .{ .empty = {} }, + data: Data = .{ .empty = {} }, - pub const StartupMessage = struct { - user: Data, - database: Data, - options: Data = Data{ .empty = {} }, - - pub fn writeInternal( - this: *const @This(), - comptime Context: type, - writer: NewWriter(Context), - ) !void { - const user = this.user.slice(); - const database = this.database.slice(); - const options = this.options.slice(); - - const count: usize = @sizeOf((i32)) + @sizeOf((i32)) + user.len + 1 + database.len + 1 + options.len + 1; - - const header = toBytes(Int32(@truncate(count))); - try writer.write(&header); - try writer.i32(196608); - if (user.len > 0) - try writer.string(user); - - if (database.len == 0) { - // The database to connect to. Defaults to the user name. - try writer.string(user); - } else { - try writer.string(database); - } + pub fn deinit(this: *SASLInitialResponse) void { + this.mechanism.deinit(); + this.data.deinit(); + } - try writer.string(options); - try writer.write(&[_]u8{0}); - } + pub fn writeInternal( + this: *const @This(), + comptime Context: type, + writer: NewWriter(Context), + ) !void { + const mechanism = this.mechanism.slice(); + const data = this.data.slice(); + const count: usize = @sizeOf((u32)) + mechanism.len + 1 + data.len + 1; + const header = [_]u8{ + 'p', + } ++ toBytes(Int32(count)); + try writer.write(&header); + try writer.string(mechanism); + try writer.string(data); + } - pub const write = writeWrap(@This(), writeInternal).write; - }; + pub const write = writeWrap(@This(), writeInternal).write; + }; - pub const Execute = struct { - max_rows: i32 = 0, - p: PortalOrPreparedStatement, - - pub fn writeInternal( - this: *const @This(), - comptime Context: type, - writer: NewWriter(Context), - ) !void { - const message = this.message.slice(); - const count: usize = @sizeOf((u32)) + @sizeOf((u32)) + message.len + 1; - const header = [_]u8{ - 'E', - } ++ toBytes(Int32(count)); - try writer.write(&header); - try writer.string(message); - try writer.i32(this.max_rows); - } + pub const SASLResponse = struct { + data: Data = .{ .empty = {} }, - pub const write = writeWrap(@This(), writeInternal).write; - }; + pub fn deinit(this: *SASLResponse) void { + this.data.deinit(); + } - pub const Describe = struct { - p: PortalOrPreparedStatement, - - pub fn writeInternal( - this: *const @This(), - comptime Context: type, - writer: NewWriter(Context), - ) !void { - const message = this.message.slice(); - const count: u32 = @sizeOf((u32)) + @sizeOf((u32)) + message.len + 1; - const header = [_]u8{ - 'D', - } ++ toBytes(Int32(count)); - try writer.write(&header); - try writer.write(&[_]u8{ - this.p.tag(), - }); - try writer.string(message); - } + pub fn writeInternal( + this: *const @This(), + comptime Context: type, + writer: NewWriter(Context), + ) !void { + const data = this.data.slice(); + const count: usize = @sizeOf((u32)) + data.len + 1; + const header = [_]u8{ + 'p', + } ++ toBytes(Int32(count)); + try writer.write(&header); + try writer.string(data); + } - pub const write = writeWrap(@This(), writeInternal).write; - }; + pub const write = writeWrap(@This(), writeInternal).write; + }; - pub const Query = struct { - message: Data = .{ .empty = {} }, + pub const StartupMessage = struct { + user: Data, + database: Data, + options: Data = Data{ .empty = {} }, - pub fn deinit(this: *Query) void { - this.message.deinit(); + pub fn writeInternal( + this: *const @This(), + comptime Context: type, + writer: NewWriter(Context), + ) !void { + const user = this.user.slice(); + const database = this.database.slice(); + const options = this.options.slice(); + + const count: usize = @sizeOf((i32)) + @sizeOf((i32)) + user.len + 1 + database.len + 1 + options.len + 1; + + const header = toBytes(Int32(@truncate(count))); + try writer.write(&header); + try writer.i32(196608); + if (user.len > 0) + try writer.string(user); + + if (database.len == 0) { + // The database to connect to. Defaults to the user name. + try writer.string(user); + } else { + try writer.string(database); } - pub fn writeInternal( - this: *const @This(), - comptime Context: type, - writer: NewWriter(Context), - ) !void { - const message = this.message.slice(); - const count: u32 = @sizeOf((u32)) + message.len + 1; - const header = [_]u8{ - 'Q', - } ++ toBytes(Int32(count)); - try writer.write(&header); - try writer.string(message); - } + try writer.string(options); + try writer.write(&[_]u8{0}); + } - pub const write = writeWrap(@This(), writeInternal).write; - }; + pub const write = writeWrap(@This(), writeInternal).write; + }; - pub const NegotiateProtocolVersion = struct { - version: i32 = 0, - unrecognized_options: std.ArrayListUnmanaged(String) = .{}, + pub const Execute = struct { + max_rows: i32 = 0, + p: PortalOrPreparedStatement, - pub fn decodeInternal( - this: *@This(), - comptime Container: type, - reader: NewReader(Container), - ) !void { - const length = try reader.length(); - std.debug.assert(length >= 4); + pub fn writeInternal( + this: *const @This(), + comptime Context: type, + writer: NewWriter(Context), + ) !void { + const message = this.message.slice(); + const count: usize = @sizeOf((u32)) + @sizeOf((u32)) + message.len + 1; + const header = [_]u8{ + 'E', + } ++ toBytes(Int32(count)); + try writer.write(&header); + try writer.string(message); + try writer.i32(this.max_rows); + } - const version = try reader.i32(); - this.* = .{ - .version = version, - }; + pub const write = writeWrap(@This(), writeInternal).write; + }; - const unrecognized_options_count: u32 = @intCast(@max(try reader.i32(), 0)); - try this.unrecognized_options.ensureTotalCapacity(bun.default_allocator, unrecognized_options_count); - errdefer { - for (this.unrecognized_options.items) |*option| { - option.deinit(); - } - this.unrecognized_options.deinit(bun.default_allocator); - } - for (0..unrecognized_options_count) |_| { - var option = try reader.readZ(); - if (option.slice().len == 0) break; - defer option.deinit(); - this.unrecognized_options.appendAssumeCapacity( - String.fromUTF8(option), - ); - } - } - }; + pub const Describe = struct { + p: PortalOrPreparedStatement, - pub const NoticeResponse = struct { - messages: std.ArrayListUnmanaged(FieldMessage) = .{}, - pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { - var remaining_bytes = try reader.length(); - remaining_bytes -|= 4; + pub fn writeInternal( + this: *const @This(), + comptime Context: type, + writer: NewWriter(Context), + ) !void { + const message = this.message.slice(); + const count: u32 = @sizeOf((u32)) + @sizeOf((u32)) + message.len + 1; + const header = [_]u8{ + 'D', + } ++ toBytes(Int32(count)); + try writer.write(&header); + try writer.write(&[_]u8{ + this.p.tag(), + }); + try writer.string(message); + } - if (remaining_bytes > 0) { - this.* = .{ - .messages = try FieldMessage.decodeList(Container, reader), - }; - } - } - pub const decode = decoderWrap(NoticeResponse, decodeInternal).decode; - }; + pub const write = writeWrap(@This(), writeInternal).write; + }; - pub const CopyFail = struct { - message: Data = .{ .empty = {} }, + pub const Query = struct { + message: Data = .{ .empty = {} }, - pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { - _ = try reader.i32(); + pub fn deinit(this: *@This()) void { + this.message.deinit(); + } - const message = try reader.readZ(); - this.* = .{ - .message = message, - }; - } + pub fn writeInternal( + this: *const @This(), + comptime Context: type, + writer: NewWriter(Context), + ) !void { + const message = this.message.slice(); + const count: u32 = @sizeOf((u32)) + message.len + 1; + const header = [_]u8{ + 'Q', + } ++ toBytes(Int32(count)); + try writer.write(&header); + try writer.string(message); + } - pub const decode = decoderWrap(CopyFail, decodeInternal).decode; - - pub fn writeInternal( - this: *@This(), - comptime Context: type, - writer: NewWriter(Context), - ) !void { - const message = this.message.slice(); - const count: u32 = @sizeOf((u32)) + message.len + 1; - const header = [_]u8{ - 'f', - } ++ toBytes(Int32(count)); - try writer.write(&header); - try writer.string(message); - } + pub const write = writeWrap(@This(), writeInternal).write; + }; - pub const write = writeWrap(@This(), writeInternal).write; - }; + pub const NegotiateProtocolVersion = struct { + version: i32 = 0, + unrecognized_options: std.ArrayListUnmanaged(String) = .{}, + + pub fn decodeInternal( + this: *@This(), + comptime Container: type, + reader: NewReader(Container), + ) !void { + const length = try reader.length(); + std.debug.assert(length >= 4); + + const version = try reader.i32(); + this.* = .{ + .version = version, + }; - pub const CopyInResponse = struct { - pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { - _ = reader; - _ = this; - TODO(@This()); + const unrecognized_options_count: u32 = @intCast(@max(try reader.i32(), 0)); + try this.unrecognized_options.ensureTotalCapacity(bun.default_allocator, unrecognized_options_count); + errdefer { + for (this.unrecognized_options.items) |*option| { + option.deinit(); + } + this.unrecognized_options.deinit(bun.default_allocator); } - - pub const decode = decoderWrap(CopyInResponse, decodeInternal).decode; - }; - - pub const CopyOutResponse = struct { - pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { - _ = reader; - _ = this; - TODO(@This()); + for (0..unrecognized_options_count) |_| { + var option = try reader.readZ(); + if (option.slice().len == 0) break; + defer option.deinit(); + this.unrecognized_options.appendAssumeCapacity( + String.fromUTF8(option), + ); } + } + }; - pub const decode = decoderWrap(CopyInResponse, decodeInternal).decode; - }; + pub const NoticeResponse = struct { + messages: std.ArrayListUnmanaged(FieldMessage) = .{}, + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + var remaining_bytes = try reader.length(); + remaining_bytes -|= 4; - fn TODO(comptime Type: type) !void { - std.debug.panic("TODO: not implemented {s}", .{bun.meta.typeBaseName(@typeName(Type))}); + if (remaining_bytes > 0) { + this.* = .{ + .messages = try FieldMessage.decodeList(Container, reader), + }; + } } + pub const decode = decoderWrap(NoticeResponse, decodeInternal).decode; }; - pub const types = struct { - pub const Tag = enum(i16) { - string = 25, - number = 0, - json = 114, - boolean = 16, - date = 1184, - datetime = 1114, - time = 1082, - bytea = 17, - bigint = 20, - _, - - fn toJSWithType( - tag: Tag, - globalObject: *JSC.JSGlobalObject, - comptime Type: type, - value: Type, - ) anyerror!JSC.JSValue { - switch (tag) { - .number => { - return number.toJS(globalObject, value); - }, + pub const CopyFail = struct { + message: Data = .{ .empty = {} }, - .json => { - return json.toJS(globalObject, value); - }, + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + _ = try reader.i32(); - .boolean => { - return boolean.toJS(globalObject, value); - }, + const message = try reader.readZ(); + this.* = .{ + .message = message, + }; + } - .date => { - return date.toJS(globalObject, value); - }, + pub const decode = decoderWrap(CopyFail, decodeInternal).decode; - .bytea => { - return bytea.toJS(globalObject, value); - }, + pub fn writeInternal( + this: *@This(), + comptime Context: type, + writer: NewWriter(Context), + ) !void { + const message = this.message.slice(); + const count: u32 = @sizeOf((u32)) + message.len + 1; + const header = [_]u8{ + 'f', + } ++ toBytes(Int32(count)); + try writer.write(&header); + try writer.string(message); + } - .bigint => { - return JSC.JSValue.fromInt64NoTruncate(globalObject, value); - }, + pub const write = writeWrap(@This(), writeInternal).write; + }; - else => { - return string.toJS(globalObject, value); - }, - } - } + pub const CopyInResponse = struct { + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + _ = reader; + _ = this; + TODO(@This()); + } - pub fn toJS( - tag: Tag, - globalObject: *JSC.JSGlobalObject, - value: anytype, - ) anyerror!JSC.JSValue { - return toJSWithType(tag, globalObject, @TypeOf(value), value); - } + pub const decode = decoderWrap(CopyInResponse, decodeInternal).decode; + }; - pub fn fromJS(globalObject: *JSC.JSGlobalObject, value: JSC.JSValue) anyerror!Tag { - if (value.isEmptyOrUndefinedOrNull()) { - return Tag.number; - } + pub const CopyOutResponse = struct { + pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void { + _ = reader; + _ = this; + TODO(@This()); + } - if (value.isCell()) { - const tag = value.jsType(); - if (tag.isStringLike()) { - return .string; - } + pub const decode = decoderWrap(CopyInResponse, decodeInternal).decode; + }; - if (tag == .JSDate) { - return .date; - } + fn TODO(comptime Type: type) !void { + std.debug.panic("TODO: not implemented {s}", .{bun.meta.typeBaseName(@typeName(Type))}); + } +}; - if (tag.isTypedArray()) { - return .bytea; - } +pub const types = struct { + pub const Tag = enum(i16) { + string = 25, + number = 0, + json = 114, + boolean = 16, + date = 1184, + datetime = 1114, + time = 1082, + bytea = 17, + bigint = 20, + _, + + fn toJSWithType( + tag: Tag, + globalObject: *JSC.JSGlobalObject, + comptime Type: type, + value: Type, + ) anyerror!JSC.JSValue { + switch (tag) { + .number => { + return number.toJS(globalObject, value); + }, + + .json => { + return json.toJS(globalObject, value); + }, + + .boolean => { + return boolean.toJS(globalObject, value); + }, + + .date => { + return date.toJS(globalObject, value); + }, + + .bytea => { + return bytea.toJS(globalObject, value); + }, + + .bigint => { + return JSC.JSValue.fromInt64NoTruncate(globalObject, value); + }, + + else => { + return string.toJS(globalObject, value); + }, + } + } - if (tag == .HeapBigInt) { - return .bigint; - } + pub fn toJS( + tag: Tag, + globalObject: *JSC.JSGlobalObject, + value: anytype, + ) anyerror!JSC.JSValue { + return toJSWithType(tag, globalObject, @TypeOf(value), value); + } - if (tag.isArrayLike() and value.getLength(globalObject) > 0) { - return Tag.fromJS(globalObject, value.getIndex(globalObject, 0)); - } + pub fn fromJS(globalObject: *JSC.JSGlobalObject, value: JSC.JSValue) anyerror!Tag { + if (value.isEmptyOrUndefinedOrNull()) { + return Tag.number; + } - if (!tag.isIndexable()) { - return error.JSError; - } + if (value.isCell()) { + const tag = value.jsType(); + if (tag.isStringLike()) { + return .string; } - if (value.isNumber()) { - return .number; + if (tag == .JSDate) { + return .date; } - if (value.isBoolean()) { - return .boolean; + if (tag.isTypedArray()) { + return .bytea; } - return Tag.number; - } - }; - - pub const string = struct { - pub const to = 25; - pub const from = [_]i16{}; - - pub fn toJSWithType( - globalThis: *JSC.JSGlobalObject, - comptime Type: type, - value: Type, - ) anyerror!JSC.JSValue { - switch (comptime Type) { - [:0]u8, []u8, []const u8, [:0]const u8 => { - var str = String.fromUTF8(value); - defer str.deinit(); - return str.toJS(globalThis); - }, - - bun.String => { - return value.toJS(globalThis); - }, + if (tag == .HeapBigInt) { + return .bigint; + } - *Data => { - var str = String.fromUTF8(value.slice()); - defer str.deinit(); - defer value.deinit(); - return str.toJS(globalThis); - }, + if (tag.isArrayLike() and value.getLength(globalObject) > 0) { + return Tag.fromJS(globalObject, value.getIndex(globalObject, 0)); + } - else => { - @compileError("unsupported type " ++ @typeName(Type)); - }, + if (!tag.isIndexable()) { + return error.JSError; } } - pub fn toJS( - globalThis: *JSC.JSGlobalObject, - value: anytype, - ) !JSC.JSValue { - var str = try toJSWithType(globalThis, @TypeOf(value), value); - defer str.deinit(); - return str.toJS(globalThis); + if (value.isNumber()) { + return .number; } - }; - pub const number = struct { - pub const to = 0; - pub const from = [_]i16{ 21, 23, 26, 700, 701 }; - - pub fn toJS( - _: *JSC.JSGlobalObject, - value: anytype, - ) anyerror!JSC.JSValue { - return JSC.JSValue.jsNumber(value); + if (value.isBoolean()) { + return .boolean; } - }; - pub const json = struct { - pub const to = 114; - pub const from = [_]i16{ 114, 3802 }; - - pub fn toJS( - globalObject: *JSC.JSGlobalObject, - value: *Data, - ) anyerror!JSC.JSValue { - defer value.deinit(); - var str = bun.String.fromUTF8(value.slice()); - defer str.deref(); - const parse_result = JSC.JSValue.parseJSON(str.toJS(globalObject), globalObject); - if (parse_result.isAnyError()) { - globalObject.throwValue(parse_result); - return error.JSError; - } + return Tag.number; + } + }; - return parse_result; + pub const string = struct { + pub const to = 25; + pub const from = [_]i16{}; + + pub fn toJSWithType( + globalThis: *JSC.JSGlobalObject, + comptime Type: type, + value: Type, + ) anyerror!JSC.JSValue { + switch (comptime Type) { + [:0]u8, []u8, []const u8, [:0]const u8 => { + var str = String.fromUTF8(value); + defer str.deinit(); + return str.toJS(globalThis); + }, + + bun.String => { + return value.toJS(globalThis); + }, + + *Data => { + var str = String.fromUTF8(value.slice()); + defer str.deinit(); + defer value.deinit(); + return str.toJS(globalThis); + }, + + else => { + @compileError("unsupported type " ++ @typeName(Type)); + }, } - }; + } + + pub fn toJS( + globalThis: *JSC.JSGlobalObject, + value: anytype, + ) !JSC.JSValue { + var str = try toJSWithType(globalThis, @TypeOf(value), value); + defer str.deinit(); + return str.toJS(globalThis); + } + }; - pub const boolean = struct { - pub const to = 16; - pub const from = [_]i16{16}; + pub const number = struct { + pub const to = 0; + pub const from = [_]i16{ 21, 23, 26, 700, 701 }; - pub fn toJS( - _: *JSC.JSGlobalObject, - value: bool, - ) anyerror!JSC.JSValue { - return JSC.JSValue.jsBoolean(value); + pub fn toJS( + _: *JSC.JSGlobalObject, + value: anytype, + ) anyerror!JSC.JSValue { + return JSC.JSValue.jsNumber(value); + } + }; + + pub const json = struct { + pub const to = 114; + pub const from = [_]i16{ 114, 3802 }; + + pub fn toJS( + globalObject: *JSC.JSGlobalObject, + value: *Data, + ) anyerror!JSC.JSValue { + defer value.deinit(); + var str = bun.String.fromUTF8(value.slice()); + defer str.deref(); + const parse_result = JSC.JSValue.parseJSON(str.toJS(globalObject), globalObject); + if (parse_result.isAnyError()) { + globalObject.throwValue(parse_result); + return error.JSError; } - }; - pub const date = struct { - pub const to = 1184; - pub const from = [_]i16{ 1082, 1114, 1184 }; + return parse_result; + } + }; - pub fn toJS( - globalObject: *JSC.JSGlobalObject, - value: *Data, - ) anyerror!JSC.JSValue { - defer value.deinit(); - return JSC.JSValue.fromDateString(globalObject, value.sliceZ().ptr); - } - }; + pub const boolean = struct { + pub const to = 16; + pub const from = [_]i16{16}; - pub const bytea = struct { - pub const to = 17; - pub const from = [_]i16{17}; - - pub fn toJS( - globalObject: *JSC.JSGlobalObject, - value: *Data, - ) anyerror!JSC.JSValue { - defer value.deinit(); - // var slice = value.slice()[@min(1, value.len)..]; - // _ = slice; - return JSC.JSValue.createBuffer(globalObject, value.slice(), null); - } - }; + pub fn toJS( + _: *JSC.JSGlobalObject, + value: bool, + ) anyerror!JSC.JSValue { + return JSC.JSValue.jsBoolean(value); + } + }; + + pub const date = struct { + pub const to = 1184; + pub const from = [_]i16{ 1082, 1114, 1184 }; + + pub fn toJS( + globalObject: *JSC.JSGlobalObject, + value: *Data, + ) anyerror!JSC.JSValue { + defer value.deinit(); + return JSC.JSValue.fromDateString(globalObject, value.sliceZ().ptr); + } + }; + + pub const bytea = struct { + pub const to = 17; + pub const from = [_]i16{17}; + + pub fn toJS( + globalObject: *JSC.JSGlobalObject, + value: *Data, + ) anyerror!JSC.JSValue { + defer value.deinit(); + // var slice = value.slice()[@min(1, value.len)..]; + // _ = slice; + return JSC.JSValue.createBuffer(globalObject, value.slice(), null); + } }; }; const Socket = uws.AnySocket; -const PreparedStatementsMap = std.HashMapUnmanaged(u64, *Statement, bun.IdentityContext(u64), 80); +const PreparedStatementsMap = std.HashMapUnmanaged(u64, *PostgresSQLStatement, bun.IdentityContext(u64), 80); -pub const SQLConnection = struct { +pub const PostgresSQLContext = struct { + tcp: ?*uws.SocketContext = null, + ssl: ?*uws.SocketContext = null, + + onQueryCompleteFn: JSC.Strong = .{}, + onStatementCompleteFn: JSC.Strong = .{}, +}; + +pub const PostgresSQLConnection = struct { socket: Socket, status: Status = Status.connecting, ref_count: u32 = 0, @@ -1268,6 +1323,22 @@ pub const SQLConnection = struct { globalObject: *JSC.JSGlobalObject, statements: PreparedStatementsMap, + has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + js_value: JSC.JSValue = JSC.JSValue.undefined, + + pub fn hasPendingActivity(this: *PostgresSQLConnection) callconv(.C) bool { + @fence(.Acquire); + return this.has_pending_activity.load(.Acquire); + } + + pub fn finalize(this: *PostgresSQLConnection) callconv(.C) void { + this.deref(); + } + + pub fn constructor(globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) ?*PostgresSQLConnection { + _ = callframe; + _ = globalObject; + } pub fn ref(this: *@This()) void { std.debug.assert(this.ref_count > 0); @@ -1292,11 +1363,18 @@ pub const SQLConnection = struct { } } + pub fn doClose(this: *@This(), globalObject: *JSC.JSGlobalObject, _: *JSC.CallFrame) void { + _ = globalObject; + this.disconnect(); + this.write_buffer.deinitWithAllocator(bun.default_allocator); + } + pub fn deinit(this: *@This()) void { var iter = this.statements.valueIterator(); while (iter.next()) |stmt| { stmt.connection = null; } + this.statements.deinit(bun.default_allocator); this.write_buffer.deinitWithAllocator(bun.default_allocator); bun.default_allocator.destroy(this); } @@ -1310,22 +1388,47 @@ pub const SQLConnection = struct { } }; -pub const SQLQuery = struct { - pub fn deinit(this: *SQLQuery) void { - _ = this; - } -}; - -pub const Statement = struct { - connection: ?*SQLConnection = null, +pub const PostgresSQLStatement = struct { + connection: ?*PostgresSQLConnection = null, name: []const u8 = "", + ref_count: u32 = 0, + poll_ref: JSC.PollRef = .{}, + has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), - pub fn deinit(this: *Statement) void { + pub fn hasPendingActivity(this: *PostgresSQLStatement) callconv(.C) bool { + return this.has_pending_activity.load(.Monotonic); + } + + pub fn deinit(this: *PostgresSQLStatement) void { if (this.connection) |conn| conn.deref(); + this.poll_ref.deactivate(JSC.VirtualMachine.get().event_loop_handle.?); bun.default_allocator.free(this.name); bun.default_allocator.destroy(this); } + + pub fn finalize(this: *PostgresSQLStatement) callconv(.C) void { + this.unref(); + } + + pub fn unref(this: *PostgresSQLStatement) void { + const ref_count = this.ref_count; + this.ref_count -= 1; + + if (ref_count == 1) { + this.deinit(); + } + } + + pub fn doRef(this: *PostgresSQLStatement, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) void { + _ = callframe; + this.poll_ref.ref(globalObject.bunVM()); + } + + pub fn doUnref(this: *PostgresSQLStatement, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) void { + _ = callframe; + this.poll_ref.unref(globalObject.bunVM()); + } }; pub const Host = struct { |