diff options
Diffstat (limited to 'src/sql/postgres.zig')
-rw-r--r-- | src/sql/postgres.zig | 151 |
1 files changed, 104 insertions, 47 deletions
diff --git a/src/sql/postgres.zig b/src/sql/postgres.zig index 4352f7db3..7c561cf7e 100644 --- a/src/sql/postgres.zig +++ b/src/sql/postgres.zig @@ -140,10 +140,30 @@ pub const protocol = struct { const offsetFn = offsetFn_; pub const Ctx = Context; + pub const WrappedWriter = @This(); + pub inline fn write(this: @This(), data: []const u8) anyerror!void { try writeFn(this.wrapped, data); } + pub const LengthWriter = struct { + index: usize, + context: WrappedWriter, + + pub fn write(this: LengthWriter) anyerror!void { + try this.context.pwrite(&Int32(this.context.offset() - this.index), this.index); + } + }; + + pub inline fn length(this: @This()) anyerror!LengthWriter { + const i = this.offset(); + try this.int32(0); + return LengthWriter{ + .index = i, + .context = this, + }; + } + pub inline fn offset(this: @This()) usize { return offsetFn(this.wrapped); } @@ -802,7 +822,7 @@ pub const protocol = struct { .name = .{ .owned = try name.toOwned() }, }; - try reader.skip(12); + try reader.skip(2 + 4 + 2); } pub const decode = decoderWrap(FieldDescription, decodeInternal).decode; @@ -936,7 +956,7 @@ pub const protocol = struct { writer: NewWriter(Context), ) !void { const parameters = this.params; - const count: usize = @sizeOf((u32)) + @sizeOf(u16) + (parameters.len * @sizeOf(u32)) + zCount(this.name) + zCount(this.query); + const count: usize = @sizeOf((u32)) + @sizeOf(u16) + (parameters.len * @sizeOf(u32)) + @max(zCount(this.name), 1) + @max(zCount(this.query), 1); const header = [_]u8{ 'P', } ++ toBytes(Int32(count)); @@ -1125,7 +1145,7 @@ pub const protocol = struct { } pub const Execute = struct { - max_rows: int32 = std.math.maxInt(int32), + max_rows: int32 = 0, p: PortalOrPreparedStatement, pub fn writeInternal( @@ -1133,14 +1153,14 @@ pub const protocol = struct { comptime Context: type, writer: NewWriter(Context), ) !void { - const message = this.p.slice(); - const count: usize = @sizeOf((u32)) + @sizeOf((u32)) + message.len + 1; - const header = [_]u8{ - 'E', - } ++ toBytes(Int32(count)); - try writer.write(&header); - try writer.string(message); + try writer.write("E"); + const length = try writer.length(); + if (this.p == .portal) + try writer.string(this.p.portal) + else + try writer.write(&[_]u8{0}); try writer.int32(this.max_rows); + try length.write(); } pub const write = writeWrap(@This(), writeInternal).write; @@ -1155,16 +1175,15 @@ pub const protocol = struct { writer: NewWriter(Context), ) !void { const message = this.p.slice(); - const count: usize = @sizeOf((u32)) + @sizeOf((u32)) + message.len + 2; - const header = [_]u8{ + try writer.write(&[_]u8{ 'D', - } ++ toBytes(Int32(count)); - try writer.write(&header); + }); + const length = try writer.length(); try writer.write(&[_]u8{ this.p.tag(), - 0, }); try writer.string(message); + try length.write(); } pub const write = writeWrap(@This(), writeInternal).write; @@ -1564,13 +1583,17 @@ pub const PostgresSQLQuery = struct { pub usingnamespace JSC.Codegen.JSPostgresSQLQuery; - pub const Status = enum { + pub const Status = enum(u8) { pending, running, success, fail, }; + pub fn hasPendingActivity(this: *@This()) callconv(.C) bool { + return this.status == .running; + } + pub fn deinit(this: *@This()) void { if (this.statement) |statement| { statement.deref(); @@ -1601,7 +1624,9 @@ pub const PostgresSQLQuery = struct { } pub fn onNoData(this: *@This(), globalObject: *JSC.JSGlobalObject) void { - if (this.thisValue == .zero) { + const thisValue = this.thisValue; + const targetValue = this.target; + if (thisValue == .zero) { this.deref(); return; } @@ -1611,14 +1636,16 @@ pub const PostgresSQLQuery = struct { // TODO: error handling _ = vm.rareData().postgresql_context.onQueryResolveFn.get().?.callWithThis( globalObject, - this.thisValue, + targetValue, &[_]JSC.JSValue{ JSC.JSValue.undefined, }, ); } pub fn onError(this: *@This(), err: protocol.ErrorResponse, globalObject: *JSC.JSGlobalObject) void { - if (this.thisValue == .zero) { + const thisValue = this.thisValue; + const targetValue = this.target; + if (thisValue == .zero) { this.deref(); return; } @@ -1638,14 +1665,15 @@ pub const PostgresSQLQuery = struct { _ = b.append(str.slice()); _ = b.append("\n"); } - const instance = globalObject.createSyntaxErrorInstance("Postgres error occurred\n{s}", .{b.allocatedSlice()}); + const instance = globalObject.createSyntaxErrorInstance("Postgres error occurred\n{s}", .{b.allocatedSlice()[0..b.len]}); + this.status = .fail; b.deinit(bun.default_allocator); this.deref(); // TODO: error handling _ = JSC.VirtualMachine.get().rareData().postgresql_context.onQueryRejectFn.get().?.callWithThis( globalObject, - this.thisValue, + targetValue, &[_]JSC.JSValue{ instance, }, @@ -1653,17 +1681,21 @@ pub const PostgresSQLQuery = struct { } pub fn onSuccess(this: *@This(), _: []const u8, globalObject: *JSC.JSGlobalObject) void { - if (this.thisValue == .zero) { + const thisValue = this.thisValue; + const targetValue = this.target; + if (thisValue == .zero) { this.deref(); return; } - const pending_value = PostgresSQLQuery.pendingValueGetCached(this.thisValue) orelse JSC.JSValue.undefined; + const pending_value = PostgresSQLQuery.pendingValueGetCached(thisValue) orelse JSC.JSValue.undefined; + this.status = .success; this.deref(); + // TODO: error handling _ = JSC.VirtualMachine.get().rareData().postgresql_context.onQueryResolveFn.get().?.callWithThis( globalObject, - this.thisValue, + targetValue, &[_]JSC.JSValue{ pending_value, }, @@ -1768,6 +1800,7 @@ pub const PostgresSQLQuery = struct { connection.requests.writeItem(this) catch {}; this.ref(); + this.status = .running; return .undefined; } @@ -1796,40 +1829,44 @@ pub const PostgresRequest = struct { comptime Context: type, writer: protocol.NewWriter(Context), ) !void { - try writer.bytes("B"); - const length_offset = writer.offset(); - try writer.int32(0); + try writer.write("B"); + const length = try writer.length(); - try writer.string(name); try writer.String(cursor_name); + try writer.string(name); var iter = JSC.JSArrayIterator.init(values_array, globalObject); - try writer.short(@intCast(iter.len)); + if (iter.len > 0) { + try writer.short(@intCast(iter.len)); - while (iter.next()) |value| { - if (value.isUndefinedOrNull()) { - try writer.short(0); - continue; - } + while (iter.next()) |value| { + if (value.isUndefinedOrNull()) { + try writer.short(0); + continue; + } - const tag = try types.Tag.fromJS(globalObject, value); + const tag = try types.Tag.fromJS(globalObject, value); - switch (tag) { - .bytea, .number => { - try writer.short(0); - }, - else => { - try writer.short(1); - }, + switch (tag) { + .bytea, .number => { + try writer.short(0); + }, + else => { + try writer.short(1); + }, + } } - } - try writer.short(@intCast(iter.len)); + try writer.short(@intCast(iter.len)); + } else { + try writer.short(0); + try writer.short(0); + } iter = JSC.JSArrayIterator.init(values_array, globalObject); - debug("Bind: {} ({d})", .{ bun.strings.QuotedFormatter{ .text = name }, iter.len }); + debug("Bind: {} ({d} args)", .{ bun.strings.QuotedFormatter{ .text = name }, iter.len }); while (iter.next()) |value| { if (value.isUndefinedOrNull()) { @@ -1887,7 +1924,9 @@ pub const PostgresRequest = struct { } } - try writer.pwrite(&std.mem.toBytes(@byteSwap(@as(int32, @intCast(writer.offset())))), length_offset); + try writer.short(0); + + try length.write(); } pub fn writeQuery( @@ -2069,6 +2108,8 @@ pub const PostgresSQLConnection = struct { .connected => { const on_connect = this.on_connect.swap(); if (on_connect == .zero) return; + this.poll_ref.unref(this.globalObject.bunVM()); + this.updateHasPendingActivity(); _ = on_connect.callWithThis( this.globalObject, this.js_value, @@ -2123,6 +2164,7 @@ pub const PostgresSQLConnection = struct { this.socket = socket; this.poll_ref.ref(this.globalObject.bunVM()); + this.updateHasPendingActivity(); var msg = protocol.StartupMessage{ .user = Data{ .temporary = this.user }, .database = Data{ .temporary = this.database }, .options = Data{ .temporary = this.options } }; msg.writeInternal(Writer, this.writer()) catch |err| { @@ -2400,7 +2442,7 @@ pub const PostgresSQLConnection = struct { } pub fn offset(this: Writer) usize { - return this.connection.write_buffer.head; + return this.connection.write_buffer.len(); } }; @@ -2586,6 +2628,7 @@ pub const PostgresSQLConnection = struct { } debug("-> {s}", .{cmd.command_tag.slice()}); _ = this.requests.discard(1); + this.updateRef(); request.onSuccess(cmd.command_tag.slice(), this.globalObject); }, .BindComplete => { @@ -2621,6 +2664,8 @@ pub const PostgresSQLConnection = struct { try reader.eatMessage(protocol.NoData); var request = this.current() orelse return error.ExpectedRequest; _ = this.requests.discard(1); + this.updateRef(); + request.onNoData(this.globalObject); }, .BackendKeyData => { @@ -2634,6 +2679,8 @@ pub const PostgresSQLConnection = struct { } var request = this.current() orelse return error.ExpectedRequest; _ = this.requests.discard(1); + this.updateRef(); + request.onError(err, this.globalObject); }, .PortalSuspended => { @@ -2663,6 +2710,7 @@ pub const PostgresSQLConnection = struct { try reader.eatMessage(protocol.EmptyQueryResponse); var request = this.current() orelse return error.ExpectedRequest; _ = this.requests.discard(1); + this.updateRef(); request.onSuccess("", this.globalObject); }, .CopyOutResponse => { @@ -2678,6 +2726,15 @@ pub const PostgresSQLConnection = struct { } } + pub fn updateRef(this: *PostgresSQLConnection) void { + this.updateHasPendingActivity(); + if (this.has_pending_activity.loadUnchecked()) { + this.poll_ref.ref(this.globalObject.bunVM()); + } else { + this.poll_ref.unref(this.globalObject.bunVM()); + } + } + pub fn doFlush(this: *PostgresSQLConnection, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { _ = callframe; _ = globalObject; |