aboutsummaryrefslogtreecommitdiff
path: root/src/sql/postgres.zig
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2023-10-15 04:23:19 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2023-10-15 04:23:19 -0700
commitad7e90ae1bb840def28308c9c8e87ee292305be6 (patch)
tree4d321e35d68ce71dd8d0df9db3b7951a12446df9 /src/sql/postgres.zig
parentc508640e2c6f03ebdc666f696b81999a29eb73b2 (diff)
downloadbun-ad7e90ae1bb840def28308c9c8e87ee292305be6.tar.gz
bun-ad7e90ae1bb840def28308c9c8e87ee292305be6.tar.zst
bun-ad7e90ae1bb840def28308c9c8e87ee292305be6.zip
The startup message sends successfully
Diffstat (limited to 'src/sql/postgres.zig')
-rw-r--r--src/sql/postgres.zig199
1 files changed, 115 insertions, 84 deletions
diff --git a/src/sql/postgres.zig b/src/sql/postgres.zig
index 203324445..df6a976b6 100644
--- a/src/sql/postgres.zig
+++ b/src/sql/postgres.zig
@@ -3,7 +3,11 @@ const JSC = bun.JSC;
const String = bun.String;
const uws = bun.uws;
const std = @import("std");
-const debug = bun.Output.scoped(.Postgres, true);
+const debug = bun.Output.scoped(.Postgres, false);
+const int32 = u32;
+const PostgresInt32 = int32;
+const short = u16;
+const PostgresShort = u16;
const Data = union(enum) {
owned: bun.ByteList,
temporary: []const u8,
@@ -144,7 +148,7 @@ pub const protocol = struct {
try pwriteFn(this.wrapped, data, i);
}
- pub fn @"i32"(this: @This(), value: i32) !void {
+ pub fn int32(this: @This(), value: PostgresInt32) !void {
try this.write(std.mem.asBytes(&@byteSwap(value)));
}
@@ -152,7 +156,7 @@ pub const protocol = struct {
try this.write(std.mem.asBytes(&@byteSwap(@as(u64, @bitCast(value)))));
}
- pub fn @"i16"(this: @This(), value: i16) !void {
+ pub fn short(this: @This(), value: PostgresShort) !void {
try this.write(std.mem.asBytes(&@byteSwap(value)));
}
@@ -173,7 +177,7 @@ pub const protocol = struct {
}
pub fn @"null"(this: @This()) !void {
- try this.i32(-1);
+ try this.int32(std.math.maxInt(PostgresInt32));
}
pub fn String(this: @This(), value: bun.String) !void {
@@ -393,21 +397,21 @@ pub const protocol = struct {
pub fn expectInt(this: @This(), comptime Int: type, comptime value: comptime_int) !bool {
var actual = try this.int(Int);
- return actual != value;
+ return actual == value;
}
- pub fn @"i32"(this: @This()) !i32 {
- return this.int(i32);
+ pub fn int32(this: @This()) !PostgresInt32 {
+ return this.int(PostgresInt32);
}
- pub fn @"i16"(this: @This()) !i16 {
- return this.int(i16);
+ pub fn short(this: @This()) !PostgresShort {
+ return this.int(PostgresShort);
}
- pub fn length(this: @This()) !i32 {
- const expected = try this.int(i32);
+ pub fn length(this: @This()) !PostgresInt32 {
+ const expected = try this.int(PostgresInt32);
if (expected > -1) {
- try this.ensureCapacity(@intCast(expected));
+ try this.ensureCapacity(@intCast(expected -| 4));
}
return expected;
@@ -483,7 +487,7 @@ pub const protocol = struct {
pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void {
const message_length = try reader.length();
- switch (try reader.i32()) {
+ switch (try reader.int32()) {
0 => {
if (message_length != 8) return error.InvalidMessageLength;
this.* = .{ .Ok = {} };
@@ -610,12 +614,12 @@ pub const protocol = struct {
pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void {
if (!try reader.expectInt(u32, 12)) {
- return error.InvalidMessage;
+ return error.InvalidBackendKeyData;
}
this.* = .{
- .process_id = @bitCast(try reader.i32()),
- .secret_key = @bitCast(try reader.i32()),
+ .process_id = @bitCast(try reader.int32()),
+ .secret_key = @bitCast(try reader.int32()),
};
}
};
@@ -701,7 +705,7 @@ pub const protocol = struct {
pub const Terminate = [_]u8{'X'} ++ toBytes(Int32(4));
fn Int32(value: anytype) [4]u8 {
- return @bitCast(@byteSwap(@as(i32, @intCast(value))));
+ return @bitCast(@byteSwap(@as(int32, @intCast(value))));
}
const toBytes = std.mem.toBytes;
@@ -742,7 +746,7 @@ pub const protocol = struct {
text,
binary,
- pub fn from(value: i16) !FormatCode {
+ pub fn from(value: short) !FormatCode {
return switch (value) {
0 => .text,
1 => .binary,
@@ -756,24 +760,16 @@ pub const protocol = struct {
var remaining_bytes = try reader.length();
remaining_bytes -|= 4;
- var remaining_fields: usize = @intCast(@max(try reader.i16(), 0));
+ var remaining_fields: usize = @intCast(@max(try reader.short(), 0));
for (0..remaining_fields) |index| {
- const byte_length = try reader.i32();
+ const byte_length = try reader.int32();
switch (byte_length) {
0 => break,
else => {
var bytes = try reader.bytes(@intCast(byte_length));
if (!try forEach(context, @intCast(index), &bytes)) break;
},
-
- -1 => {
- if (!try forEach(context, @intCast(index), null)) break;
- },
-
- std.math.minInt(i32)...-2 => {
- return error.InvalidMessageLength;
- },
}
}
}
@@ -783,9 +779,9 @@ pub const protocol = struct {
pub const FieldDescription = struct {
name: Data = .{ .empty = {} },
- table_oid: i32 = 0,
- column_index: i16 = 0,
- type_oid: i16 = 0,
+ table_oid: int32 = 0,
+ column_index: short = 0,
+ type_oid: short = 0,
pub fn deinit(this: *@This()) void {
this.name.deinit();
@@ -797,9 +793,9 @@ pub const protocol = struct {
name.deinit();
}
this.* = .{
- .table_oid = try reader.i32(),
- .column_index = try reader.i16(),
- .type_oid = @truncate(try reader.i32()),
+ .table_oid = try reader.int32(),
+ .column_index = try reader.short(),
+ .type_oid = @truncate(try reader.int32()),
.name = .{ .owned = try name.toOwned() },
};
@@ -823,7 +819,7 @@ pub const protocol = struct {
var remaining_bytes = try reader.length();
remaining_bytes -|= 4;
- const field_count: usize = @intCast(@max(try reader.i16(), 0));
+ const field_count: usize = @intCast(@max(try reader.short(), 0));
var fields = try bun.default_allocator.alloc(
FieldDescription,
field_count,
@@ -849,18 +845,18 @@ pub const protocol = struct {
};
pub const ParameterDescription = struct {
- parameters: []i32 = &[_]i32{},
+ parameters: []int32 = &[_]int32{},
pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void {
var remaining_bytes = try reader.length();
remaining_bytes -|= 4;
- const count = try reader.i16();
- var parameters = try bun.default_allocator.alloc(i32, @intCast(@max(count, 0)));
+ const count = try reader.short();
+ var parameters = try bun.default_allocator.alloc(int32, @intCast(@max(count, 0)));
- var data = try reader.read(@as(usize, @intCast(@max(count, 0))) * @sizeOf((i32)));
+ var data = try reader.read(@as(usize, @intCast(@max(count, 0))) * @sizeOf((int32)));
defer data.deinit();
- const input_params: []align(1) const i32 = toInt32Slice(i32, data.slice());
+ const input_params: []align(1) const int32 = toInt32Slice(int32, data.slice());
for (input_params, parameters) |src, *dest| {
dest.* = @byteSwap(src);
}
@@ -879,7 +875,7 @@ pub const protocol = struct {
}
pub const NotificationResponse = struct {
- pid: i32 = 0,
+ pid: int32 = 0,
channel: bun.ByteList = .{},
payload: bun.ByteList = .{},
@@ -893,7 +889,7 @@ pub const protocol = struct {
std.debug.assert(length >= 4);
this.* = .{
- .pid = try reader.i32(),
+ .pid = try reader.int32(),
.channel = (try reader.readZ()).toOwned(),
.payload = (try reader.readZ()).toOwned(),
};
@@ -925,7 +921,7 @@ pub const protocol = struct {
pub const Parse = struct {
name: []const u8 = "",
query: []const u8 = "",
- params: []const i32 = &.{},
+ params: []const int32 = &.{},
pub fn deinit(this: *Parse) void {
_ = this;
@@ -944,9 +940,9 @@ pub const protocol = struct {
try writer.write(&header);
try writer.string(this.name);
try writer.string(this.query);
- try writer.i16(@intCast(parameters.len));
+ try writer.short(@intCast(parameters.len));
for (parameters) |parameter| {
- try writer.i32(parameter);
+ try writer.int32(parameter);
}
}
@@ -1082,14 +1078,18 @@ pub const protocol = struct {
const database = this.database.slice();
const options = this.options.slice();
- const count: usize = @sizeOf((i32)) + @sizeOf((i32)) + user.len + 1 + database.len + 1 + options.len + 1;
+ const count: usize = @sizeOf((int32)) + @sizeOf((int32)) + zCount("user", user) + zCount("database", database) + zCount("client_encoding", "UTF8") + zCount("", options) + 1;
const header = toBytes(Int32(@as(u32, @truncate(count))));
try writer.write(&header);
- try writer.i32(196608);
+ try writer.int32(196608);
+
+ try writer.string("user");
if (user.len > 0)
try writer.string(user);
+ try writer.string("database");
+
if (database.len == 0) {
// The database to connect to. Defaults to the user name.
try writer.string(user);
@@ -1097,15 +1097,32 @@ pub const protocol = struct {
try writer.string(database);
}
- try writer.string(options);
+ try writer.string("client_encoding");
+ try writer.string("UTF8");
+
+ if (options.len > 0)
+ try writer.string(options);
+
try writer.write(&[_]u8{0});
}
pub const write = writeWrap(@This(), writeInternal).write;
};
+ fn zCount(prefix: []const u8, slice: []const u8) usize {
+ if (slice.len > 0) {
+ return slice.len + 1 + prefix.len + 1;
+ }
+
+ if (prefix.len > 0) {
+ return prefix.len + 1;
+ }
+
+ return 0;
+ }
+
pub const Execute = struct {
- max_rows: i32 = 0,
+ max_rows: int32 = 0,
p: PortalOrPreparedStatement,
pub fn writeInternal(
@@ -1120,7 +1137,7 @@ pub const protocol = struct {
} ++ toBytes(Int32(count));
try writer.write(&header);
try writer.string(message);
- try writer.i32(this.max_rows);
+ try writer.int32(this.max_rows);
}
pub const write = writeWrap(@This(), writeInternal).write;
@@ -1174,7 +1191,7 @@ pub const protocol = struct {
};
pub const NegotiateProtocolVersion = struct {
- version: i32 = 0,
+ version: int32 = 0,
unrecognized_options: std.ArrayListUnmanaged(String) = .{},
pub fn decodeInternal(
@@ -1185,12 +1202,12 @@ pub const protocol = struct {
const length = try reader.length();
std.debug.assert(length >= 4);
- const version = try reader.i32();
+ const version = try reader.int32();
this.* = .{
.version = version,
};
- const unrecognized_options_count: u32 = @intCast(@max(try reader.i32(), 0));
+ const unrecognized_options_count: u32 = @intCast(@max(try reader.int32(), 0));
try this.unrecognized_options.ensureTotalCapacity(bun.default_allocator, unrecognized_options_count);
errdefer {
for (this.unrecognized_options.items) |*option| {
@@ -1234,7 +1251,7 @@ pub const protocol = struct {
message: Data = .{ .empty = {} },
pub fn decodeInternal(this: *@This(), comptime Container: type, reader: NewReader(Container)) !void {
- _ = try reader.i32();
+ _ = try reader.int32();
const message = try reader.readZ();
this.* = .{
@@ -1287,7 +1304,7 @@ pub const protocol = struct {
};
pub const types = struct {
- pub const Tag = enum(i16) {
+ pub const Tag = enum(short) {
string = 25,
number = 0,
json = 114,
@@ -1390,7 +1407,7 @@ pub const types = struct {
pub const string = struct {
pub const to = 25;
- pub const from = [_]i16{};
+ pub const from = [_]short{};
pub fn toJSWithType(
globalThis: *JSC.JSGlobalObject,
@@ -1433,7 +1450,7 @@ pub const types = struct {
pub const number = struct {
pub const to = 0;
- pub const from = [_]i16{ 21, 23, 26, 700, 701 };
+ pub const from = [_]short{ 21, 23, 26, 700, 701 };
pub fn toJS(
_: *JSC.JSGlobalObject,
@@ -1445,7 +1462,7 @@ pub const types = struct {
pub const json = struct {
pub const to = 114;
- pub const from = [_]i16{ 114, 3802 };
+ pub const from = [_]short{ 114, 3802 };
pub fn toJS(
globalObject: *JSC.JSGlobalObject,
@@ -1466,7 +1483,7 @@ pub const types = struct {
pub const boolean = struct {
pub const to = 16;
- pub const from = [_]i16{16};
+ pub const from = [_]short{16};
pub fn toJS(
_: *JSC.JSGlobalObject,
@@ -1478,7 +1495,7 @@ pub const types = struct {
pub const date = struct {
pub const to = 1184;
- pub const from = [_]i16{ 1082, 1114, 1184 };
+ pub const from = [_]short{ 1082, 1114, 1184 };
pub fn toJS(
globalObject: *JSC.JSGlobalObject,
@@ -1491,7 +1508,7 @@ pub const types = struct {
pub const bytea = struct {
pub const to = 17;
- pub const from = [_]i16{17};
+ pub const from = [_]short{17};
pub fn toJS(
globalObject: *JSC.JSGlobalObject,
@@ -1777,18 +1794,18 @@ pub const PostgresRequest = struct {
) !void {
try writer.bytes("B");
const length_offset = writer.offset();
- try writer.i32(0);
+ try writer.int32(0);
try writer.string(name);
try writer.String(cursor_name);
var iter = JSC.JSArrayIterator.init(values_array, globalObject);
- try writer.i16(@intCast(iter.len));
+ try writer.short(@intCast(iter.len));
while (iter.next()) |value| {
if (value.isUndefinedOrNull()) {
- try writer.i16(0);
+ try writer.short(0);
continue;
}
@@ -1796,15 +1813,15 @@ pub const PostgresRequest = struct {
switch (tag) {
.bytea, .number => {
- try writer.i16(0);
+ try writer.short(0);
},
else => {
- try writer.i16(1);
+ try writer.short(1);
},
}
}
- try writer.i16(@intCast(iter.len));
+ try writer.short(@intCast(iter.len));
iter = JSC.JSArrayIterator.init(values_array, globalObject);
@@ -1813,7 +1830,7 @@ pub const PostgresRequest = struct {
while (iter.next()) |value| {
if (value.isUndefinedOrNull()) {
debug(" -> NULL", .{});
- try writer.i32(4);
+ try writer.int32(4);
try writer.null();
continue;
}
@@ -1823,10 +1840,10 @@ pub const PostgresRequest = struct {
.number => {
debug(" -> {s}", .{@tagName(tag)});
if (value.isInt32()) {
- try writer.i32(4);
- try writer.i32(value.to(i32));
+ try writer.int32(4);
+ try writer.int32(value.to(int32));
} else {
- try writer.i32(8);
+ try writer.int32(8);
try writer.f64(value.coerceToDouble(globalObject));
}
},
@@ -1852,7 +1869,7 @@ pub const PostgresRequest = struct {
if (value.asArrayBuffer(globalObject)) |buf| {
bytes = buf.byteSlice();
}
- try writer.i32(@intCast(bytes.len));
+ try writer.int32(@intCast(bytes.len));
debug(" -> {s}: {d}", .{ @tagName(tag), bytes.len });
try writer.bytes(bytes);
@@ -1866,13 +1883,13 @@ pub const PostgresRequest = struct {
}
}
- try writer.pwrite(&std.mem.toBytes(@byteSwap(@as(i32, @intCast(writer.offset())))), length_offset);
+ try writer.pwrite(&std.mem.toBytes(@byteSwap(@as(int32, @intCast(writer.offset())))), length_offset);
}
pub fn writeQuery(
query: []const u8,
name: []const u8,
- params: []const i32,
+ params: []const int32,
comptime Context: type,
writer: protocol.NewWriter(Context),
) !void {
@@ -2108,6 +2125,8 @@ pub const PostgresSQLConnection = struct {
socket.close();
this.fail("Failed to write startup message", err);
};
+
+ this.flushData();
}
pub fn onTimeout(this: *PostgresSQLConnection) void {
@@ -2130,6 +2149,11 @@ pub const PostgresSQLConnection = struct {
this.read_buffer.byte_list.len = 0;
this.read_buffer.write(bun.default_allocator, data[offset..]) catch @panic("failed to write to read buffer");
} else {
+ if (comptime bun.Environment.allow_assert) {
+ if (@errorReturnTrace()) |trace| {
+ debug("Error: {s}\n{}", .{ @errorName(err), trace });
+ }
+ }
this.fail("Failed to read data", err);
}
};
@@ -2140,6 +2164,11 @@ pub const PostgresSQLConnection = struct {
this.read_buffer.write(bun.default_allocator, data) catch @panic("failed to write to read buffer");
PostgresRequest.onData(this, Reader, this.bufferedReader()) catch |err| {
if (err != error.ShortRead) {
+ if (comptime bun.Environment.allow_assert) {
+ if (@errorReturnTrace()) |trace| {
+ debug("Error: {s}\n{}", .{ @errorName(err), trace });
+ }
+ }
this.fail("Failed to read data", err);
}
return;
@@ -2210,17 +2239,17 @@ pub const PostgresSQLConnection = struct {
.statements = PreparedStatementsMap{},
};
- ptr.socket = socket: {
+ {
const hostname = hostname_str.toUTF8(bun.default_allocator);
defer hostname.deinit();
if (tls_object.isEmptyOrUndefinedOrNull()) {
var ctx = vm.rareData().postgresql_context.tcp orelse brk: {
var ctx_ = uws.us_create_bun_socket_context(0, vm.event_loop_handle, @sizeOf(*PostgresSQLConnection), uws.us_bun_socket_context_options_t{}).?;
- uws.NewSocketHandler(false).configure(ctx_, false, PostgresSQLConnection, SocketHandler(false));
+ uws.NewSocketHandler(false).configure(ctx_, true, *PostgresSQLConnection, SocketHandler(false));
vm.rareData().postgresql_context.tcp = ctx_;
break :brk ctx_;
};
- break :socket Socket{
+ ptr.socket = .{
.SocketTCP = uws.SocketTCP.connectAnon(hostname.slice(), port, ctx, ptr) orelse {
globalObject.throwError(error.ConnectionFailed, "failed to connect to postgresql");
ptr.deinit();
@@ -2233,7 +2262,9 @@ pub const PostgresSQLConnection = struct {
ptr.deinit();
return null;
}
- };
+ }
+ ptr.updateHasPendingActivity();
+ ptr.poll_ref.ref(vm);
return ptr;
}
@@ -2447,10 +2478,10 @@ pub const PostgresSQLConnection = struct {
putDirectOffset(this.object, this.vm, index, JSC.JSValue.jsNull());
},
2 => {
- putDirectOffset(this.object, this.vm, index, JSC.JSValue.jsNumber(@as(i32, @as(i16, @bitCast(bytes[0..2].*)))));
+ putDirectOffset(this.object, this.vm, index, JSC.JSValue.jsNumber(@as(int32, @as(short, @bitCast(bytes[0..2].*)))));
},
4 => {
- putDirectOffset(this.object, this.vm, index, JSC.JSValue.jsNumber(@as(i32, @bitCast(bytes[0..4].*))));
+ putDirectOffset(this.object, this.vm, index, JSC.JSValue.jsNumber(@as(int32, @bitCast(bytes[0..4].*))));
},
else => {
var eight: usize = 0;
@@ -2664,7 +2695,7 @@ pub const PostgresSQLStatement = struct {
cached_structure: JSC.Strong = .{},
ref_count: u32 = 1,
fields: []const protocol.FieldDescription = &[_]protocol.FieldDescription{},
- parameters: []const i32 = &[_]i32{},
+ parameters: []const int32 = &[_]int32{},
signature: Signature,
pub fn ref(this: *@This()) void {
std.debug.assert(this.ref_count > 0);
@@ -2717,7 +2748,7 @@ pub const PostgresSQLStatement = struct {
};
const Signature = struct {
- fields: []const i32,
+ fields: []const int32,
name: []const u8,
query: []const u8,
@@ -2736,7 +2767,7 @@ const Signature = struct {
}
pub fn generate(globalObject: *JSC.JSGlobalObject, query: []const u8, array_value: JSC.JSValue) !Signature {
- var fields = std.ArrayList(i32).init(bun.default_allocator);
+ var fields = std.ArrayList(int32).init(bun.default_allocator);
var name = try std.ArrayList(u8).initCapacity(bun.default_allocator, query.len);
errdefer {
@@ -2748,7 +2779,7 @@ const Signature = struct {
while (iter.next()) |value| {
if (value.isUndefinedOrNull()) {
- try fields.append(@byteSwap(@as(i32, -1)));
+ try fields.append(@byteSwap(@as(int32, std.math.maxInt(int32))));
try name.appendSlice(".null");
continue;
}