aboutsummaryrefslogtreecommitdiff
path: root/src/sql/postgres.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/sql/postgres.zig')
-rw-r--r--src/sql/postgres.zig151
1 files changed, 104 insertions, 47 deletions
diff --git a/src/sql/postgres.zig b/src/sql/postgres.zig
index 4352f7db3..7c561cf7e 100644
--- a/src/sql/postgres.zig
+++ b/src/sql/postgres.zig
@@ -140,10 +140,30 @@ pub const protocol = struct {
const offsetFn = offsetFn_;
pub const Ctx = Context;
+ pub const WrappedWriter = @This();
+
pub inline fn write(this: @This(), data: []const u8) anyerror!void {
try writeFn(this.wrapped, data);
}
+ pub const LengthWriter = struct {
+ index: usize,
+ context: WrappedWriter,
+
+ pub fn write(this: LengthWriter) anyerror!void {
+ try this.context.pwrite(&Int32(this.context.offset() - this.index), this.index);
+ }
+ };
+
+ pub inline fn length(this: @This()) anyerror!LengthWriter {
+ const i = this.offset();
+ try this.int32(0);
+ return LengthWriter{
+ .index = i,
+ .context = this,
+ };
+ }
+
pub inline fn offset(this: @This()) usize {
return offsetFn(this.wrapped);
}
@@ -802,7 +822,7 @@ pub const protocol = struct {
.name = .{ .owned = try name.toOwned() },
};
- try reader.skip(12);
+ try reader.skip(2 + 4 + 2);
}
pub const decode = decoderWrap(FieldDescription, decodeInternal).decode;
@@ -936,7 +956,7 @@ pub const protocol = struct {
writer: NewWriter(Context),
) !void {
const parameters = this.params;
- const count: usize = @sizeOf((u32)) + @sizeOf(u16) + (parameters.len * @sizeOf(u32)) + zCount(this.name) + zCount(this.query);
+ const count: usize = @sizeOf((u32)) + @sizeOf(u16) + (parameters.len * @sizeOf(u32)) + @max(zCount(this.name), 1) + @max(zCount(this.query), 1);
const header = [_]u8{
'P',
} ++ toBytes(Int32(count));
@@ -1125,7 +1145,7 @@ pub const protocol = struct {
}
pub const Execute = struct {
- max_rows: int32 = std.math.maxInt(int32),
+ max_rows: int32 = 0,
p: PortalOrPreparedStatement,
pub fn writeInternal(
@@ -1133,14 +1153,14 @@ pub const protocol = struct {
comptime Context: type,
writer: NewWriter(Context),
) !void {
- const message = this.p.slice();
- const count: usize = @sizeOf((u32)) + @sizeOf((u32)) + message.len + 1;
- const header = [_]u8{
- 'E',
- } ++ toBytes(Int32(count));
- try writer.write(&header);
- try writer.string(message);
+ try writer.write("E");
+ const length = try writer.length();
+ if (this.p == .portal)
+ try writer.string(this.p.portal)
+ else
+ try writer.write(&[_]u8{0});
try writer.int32(this.max_rows);
+ try length.write();
}
pub const write = writeWrap(@This(), writeInternal).write;
@@ -1155,16 +1175,15 @@ pub const protocol = struct {
writer: NewWriter(Context),
) !void {
const message = this.p.slice();
- const count: usize = @sizeOf((u32)) + @sizeOf((u32)) + message.len + 2;
- const header = [_]u8{
+ try writer.write(&[_]u8{
'D',
- } ++ toBytes(Int32(count));
- try writer.write(&header);
+ });
+ const length = try writer.length();
try writer.write(&[_]u8{
this.p.tag(),
- 0,
});
try writer.string(message);
+ try length.write();
}
pub const write = writeWrap(@This(), writeInternal).write;
@@ -1564,13 +1583,17 @@ pub const PostgresSQLQuery = struct {
pub usingnamespace JSC.Codegen.JSPostgresSQLQuery;
- pub const Status = enum {
+ pub const Status = enum(u8) {
pending,
running,
success,
fail,
};
+ pub fn hasPendingActivity(this: *@This()) callconv(.C) bool {
+ return this.status == .running;
+ }
+
pub fn deinit(this: *@This()) void {
if (this.statement) |statement| {
statement.deref();
@@ -1601,7 +1624,9 @@ pub const PostgresSQLQuery = struct {
}
pub fn onNoData(this: *@This(), globalObject: *JSC.JSGlobalObject) void {
- if (this.thisValue == .zero) {
+ const thisValue = this.thisValue;
+ const targetValue = this.target;
+ if (thisValue == .zero) {
this.deref();
return;
}
@@ -1611,14 +1636,16 @@ pub const PostgresSQLQuery = struct {
// TODO: error handling
_ = vm.rareData().postgresql_context.onQueryResolveFn.get().?.callWithThis(
globalObject,
- this.thisValue,
+ targetValue,
&[_]JSC.JSValue{
JSC.JSValue.undefined,
},
);
}
pub fn onError(this: *@This(), err: protocol.ErrorResponse, globalObject: *JSC.JSGlobalObject) void {
- if (this.thisValue == .zero) {
+ const thisValue = this.thisValue;
+ const targetValue = this.target;
+ if (thisValue == .zero) {
this.deref();
return;
}
@@ -1638,14 +1665,15 @@ pub const PostgresSQLQuery = struct {
_ = b.append(str.slice());
_ = b.append("\n");
}
- const instance = globalObject.createSyntaxErrorInstance("Postgres error occurred\n{s}", .{b.allocatedSlice()});
+ const instance = globalObject.createSyntaxErrorInstance("Postgres error occurred\n{s}", .{b.allocatedSlice()[0..b.len]});
+ this.status = .fail;
b.deinit(bun.default_allocator);
this.deref();
// TODO: error handling
_ = JSC.VirtualMachine.get().rareData().postgresql_context.onQueryRejectFn.get().?.callWithThis(
globalObject,
- this.thisValue,
+ targetValue,
&[_]JSC.JSValue{
instance,
},
@@ -1653,17 +1681,21 @@ pub const PostgresSQLQuery = struct {
}
pub fn onSuccess(this: *@This(), _: []const u8, globalObject: *JSC.JSGlobalObject) void {
- if (this.thisValue == .zero) {
+ const thisValue = this.thisValue;
+ const targetValue = this.target;
+ if (thisValue == .zero) {
this.deref();
return;
}
- const pending_value = PostgresSQLQuery.pendingValueGetCached(this.thisValue) orelse JSC.JSValue.undefined;
+ const pending_value = PostgresSQLQuery.pendingValueGetCached(thisValue) orelse JSC.JSValue.undefined;
+ this.status = .success;
this.deref();
+
// TODO: error handling
_ = JSC.VirtualMachine.get().rareData().postgresql_context.onQueryResolveFn.get().?.callWithThis(
globalObject,
- this.thisValue,
+ targetValue,
&[_]JSC.JSValue{
pending_value,
},
@@ -1768,6 +1800,7 @@ pub const PostgresSQLQuery = struct {
connection.requests.writeItem(this) catch {};
this.ref();
+ this.status = .running;
return .undefined;
}
@@ -1796,40 +1829,44 @@ pub const PostgresRequest = struct {
comptime Context: type,
writer: protocol.NewWriter(Context),
) !void {
- try writer.bytes("B");
- const length_offset = writer.offset();
- try writer.int32(0);
+ try writer.write("B");
+ const length = try writer.length();
- try writer.string(name);
try writer.String(cursor_name);
+ try writer.string(name);
var iter = JSC.JSArrayIterator.init(values_array, globalObject);
- try writer.short(@intCast(iter.len));
+ if (iter.len > 0) {
+ try writer.short(@intCast(iter.len));
- while (iter.next()) |value| {
- if (value.isUndefinedOrNull()) {
- try writer.short(0);
- continue;
- }
+ while (iter.next()) |value| {
+ if (value.isUndefinedOrNull()) {
+ try writer.short(0);
+ continue;
+ }
- const tag = try types.Tag.fromJS(globalObject, value);
+ const tag = try types.Tag.fromJS(globalObject, value);
- switch (tag) {
- .bytea, .number => {
- try writer.short(0);
- },
- else => {
- try writer.short(1);
- },
+ switch (tag) {
+ .bytea, .number => {
+ try writer.short(0);
+ },
+ else => {
+ try writer.short(1);
+ },
+ }
}
- }
- try writer.short(@intCast(iter.len));
+ try writer.short(@intCast(iter.len));
+ } else {
+ try writer.short(0);
+ try writer.short(0);
+ }
iter = JSC.JSArrayIterator.init(values_array, globalObject);
- debug("Bind: {} ({d})", .{ bun.strings.QuotedFormatter{ .text = name }, iter.len });
+ debug("Bind: {} ({d} args)", .{ bun.strings.QuotedFormatter{ .text = name }, iter.len });
while (iter.next()) |value| {
if (value.isUndefinedOrNull()) {
@@ -1887,7 +1924,9 @@ pub const PostgresRequest = struct {
}
}
- try writer.pwrite(&std.mem.toBytes(@byteSwap(@as(int32, @intCast(writer.offset())))), length_offset);
+ try writer.short(0);
+
+ try length.write();
}
pub fn writeQuery(
@@ -2069,6 +2108,8 @@ pub const PostgresSQLConnection = struct {
.connected => {
const on_connect = this.on_connect.swap();
if (on_connect == .zero) return;
+ this.poll_ref.unref(this.globalObject.bunVM());
+ this.updateHasPendingActivity();
_ = on_connect.callWithThis(
this.globalObject,
this.js_value,
@@ -2123,6 +2164,7 @@ pub const PostgresSQLConnection = struct {
this.socket = socket;
this.poll_ref.ref(this.globalObject.bunVM());
+ this.updateHasPendingActivity();
var msg = protocol.StartupMessage{ .user = Data{ .temporary = this.user }, .database = Data{ .temporary = this.database }, .options = Data{ .temporary = this.options } };
msg.writeInternal(Writer, this.writer()) catch |err| {
@@ -2400,7 +2442,7 @@ pub const PostgresSQLConnection = struct {
}
pub fn offset(this: Writer) usize {
- return this.connection.write_buffer.head;
+ return this.connection.write_buffer.len();
}
};
@@ -2586,6 +2628,7 @@ pub const PostgresSQLConnection = struct {
}
debug("-> {s}", .{cmd.command_tag.slice()});
_ = this.requests.discard(1);
+ this.updateRef();
request.onSuccess(cmd.command_tag.slice(), this.globalObject);
},
.BindComplete => {
@@ -2621,6 +2664,8 @@ pub const PostgresSQLConnection = struct {
try reader.eatMessage(protocol.NoData);
var request = this.current() orelse return error.ExpectedRequest;
_ = this.requests.discard(1);
+ this.updateRef();
+
request.onNoData(this.globalObject);
},
.BackendKeyData => {
@@ -2634,6 +2679,8 @@ pub const PostgresSQLConnection = struct {
}
var request = this.current() orelse return error.ExpectedRequest;
_ = this.requests.discard(1);
+ this.updateRef();
+
request.onError(err, this.globalObject);
},
.PortalSuspended => {
@@ -2663,6 +2710,7 @@ pub const PostgresSQLConnection = struct {
try reader.eatMessage(protocol.EmptyQueryResponse);
var request = this.current() orelse return error.ExpectedRequest;
_ = this.requests.discard(1);
+ this.updateRef();
request.onSuccess("", this.globalObject);
},
.CopyOutResponse => {
@@ -2678,6 +2726,15 @@ pub const PostgresSQLConnection = struct {
}
}
+ pub fn updateRef(this: *PostgresSQLConnection) void {
+ this.updateHasPendingActivity();
+ if (this.has_pending_activity.loadUnchecked()) {
+ this.poll_ref.ref(this.globalObject.bunVM());
+ } else {
+ this.poll_ref.unref(this.globalObject.bunVM());
+ }
+ }
+
pub fn doFlush(this: *PostgresSQLConnection, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue {
_ = callframe;
_ = globalObject;