diff options
Diffstat (limited to 'src/sql/postgres.zig')
-rw-r--r-- | src/sql/postgres.zig | 112 |
1 files changed, 71 insertions, 41 deletions
diff --git a/src/sql/postgres.zig b/src/sql/postgres.zig index 7c561cf7e..039452d5c 100644 --- a/src/sql/postgres.zig +++ b/src/sql/postgres.zig @@ -1633,14 +1633,8 @@ pub const PostgresSQLQuery = struct { this.deref(); var vm = JSC.VirtualMachine.get(); - // TODO: error handling - _ = vm.rareData().postgresql_context.onQueryResolveFn.get().?.callWithThis( - globalObject, - targetValue, - &[_]JSC.JSValue{ - JSC.JSValue.undefined, - }, - ); + const function = vm.rareData().postgresql_context.onQueryResolveFn.get().?; + globalObject.queueMicrotask(function, &[_]JSC.JSValue{ targetValue, JSC.JSValue.undefined }); } pub fn onError(this: *@This(), err: protocol.ErrorResponse, globalObject: *JSC.JSGlobalObject) void { const thisValue = this.thisValue; @@ -1671,13 +1665,9 @@ pub const PostgresSQLQuery = struct { this.deref(); // TODO: error handling - _ = JSC.VirtualMachine.get().rareData().postgresql_context.onQueryRejectFn.get().?.callWithThis( - globalObject, - targetValue, - &[_]JSC.JSValue{ - instance, - }, - ); + var vm = JSC.VirtualMachine.get(); + const function = vm.rareData().postgresql_context.onQueryRejectFn.get().?; + globalObject.queueMicrotask(function, &[_]JSC.JSValue{ targetValue, instance }); } pub fn onSuccess(this: *@This(), _: []const u8, globalObject: *JSC.JSGlobalObject) void { @@ -1692,14 +1682,9 @@ pub const PostgresSQLQuery = struct { this.status = .success; this.deref(); - // TODO: error handling - _ = JSC.VirtualMachine.get().rareData().postgresql_context.onQueryResolveFn.get().?.callWithThis( - globalObject, - targetValue, - &[_]JSC.JSValue{ - pending_value, - }, - ); + var vm = JSC.VirtualMachine.get(); + const function = vm.rareData().postgresql_context.onQueryResolveFn.get().?; + globalObject.queueMicrotask(function, &[_]JSC.JSValue{ targetValue, pending_value }); } pub fn constructor(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) ?*PostgresSQLQuery { @@ -1773,18 +1758,38 @@ pub const PostgresSQLQuery = struct { this.target = query; this.thisValue = callframe.this(); const binding_value = PostgresSQLQuery.bindingGetCached(callframe.this()) orelse .zero; + var query_str = this.query.toUTF8(bun.default_allocator); + defer query_str.deinit(); + + var signature = Signature.generate(globalObject, query_str.slice(), binding_value) catch |err| { + globalObject.throwError(err, "failed to generate signature"); + return .zero; + }; var writer = connection.writer(); - if (this.statement) |stmt| { - PostgresRequest.bindAndExecute(globalObject, stmt, binding_value, PostgresSQLConnection.Writer, writer) catch |err| { + + var entry = connection.statements.getOrPut(bun.default_allocator, bun.hash(signature.name)) catch |err| { + globalObject.throwError(err, "failed to allocate statement"); + signature.deinit(); + return .zero; + }; + if (entry.found_existing) { + this.statement = entry.value_ptr.*; + this.statement.?.ref(); + signature.deinit(); + + PostgresRequest.bindAndExecute(globalObject, this.statement.?, binding_value, PostgresSQLConnection.Writer, writer) catch |err| { globalObject.throwError(err, "failed to bind and execute query"); + return .zero; }; } else { - const signature = PostgresRequest.prepareAndQuery(globalObject, this.query, binding_value, PostgresSQLConnection.Writer, writer) catch |err| { - globalObject.throwError(err, "failed to prepare query"); + PostgresRequest.prepareAndQueryWithSignature(globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, writer, &signature) catch |err| { + globalObject.throwError(err, "failed to prepare and query"); + signature.deinit(); return .zero; }; + var stmt = bun.default_allocator.create(PostgresSQLStatement) catch |err| { globalObject.throwError(err, "failed to allocate statement"); return .zero; @@ -1792,16 +1797,19 @@ pub const PostgresSQLQuery = struct { stmt.* = .{ .signature = signature, + .ref_count = 2, }; this.statement = stmt; + entry.value_ptr.* = stmt; } - connection.flushData(); - connection.requests.writeItem(this) catch {}; this.ref(); this.status = .running; + if (connection.is_ready_for_query) + connection.flushData(); + return .undefined; } @@ -1957,6 +1965,27 @@ pub const PostgresRequest = struct { } } + pub fn prepareAndQueryWithSignature( + globalObject: *JSC.JSGlobalObject, + query: []const u8, + array_value: JSC.JSValue, + comptime Context: type, + writer: protocol.NewWriter(Context), + signature: *Signature, + ) !void { + try writeQuery(query, signature.name, signature.fields, Context, writer); + try writeBind(signature.name, bun.String.empty, globalObject, array_value, Context, writer); + var exec = protocol.Execute{ + .p = .{ + .prepared_statement = signature.name, + }, + }; + try exec.writeInternal(Context, writer); + + try writer.write(&protocol.Flush); + try writer.write(&protocol.Sync); + } + pub fn prepareAndQuery( globalObject: *JSC.JSGlobalObject, query: bun.String, @@ -1971,16 +2000,7 @@ pub const PostgresRequest = struct { signature.deinit(); } - try writeQuery(query_.slice(), signature.name, signature.fields, Context, writer); - try writeBind(signature.name, bun.String.empty, globalObject, array_value, Context, writer); - var exec = protocol.Execute{ - .p = .{ - .prepared_statement = signature.name, - }, - }; - try exec.writeInternal(Context, writer); - - try writer.write(&protocol.Flush); + try prepareAndQueryWithSignature(globalObject, query_.slice(), array_value, Context, writer, &signature); return signature; } @@ -2001,6 +2021,7 @@ pub const PostgresRequest = struct { try exec.writeInternal(Context, writer); try writer.write(&protocol.Flush); + try writer.write(&protocol.Sync); } pub fn onData( @@ -2157,6 +2178,8 @@ pub const PostgresSQLConnection = struct { } pub fn onClose(this: *PostgresSQLConnection) void { + var vm = this.globalObject.bunVM(); + defer vm.drainMicrotasks(); this.fail("Connection closed", error.ConnectionClosed); } @@ -2176,15 +2199,20 @@ pub const PostgresSQLConnection = struct { } pub fn onTimeout(this: *PostgresSQLConnection) void { - _ = this; + var vm = this.globalObject.bunVM(); + defer vm.drainMicrotasks(); debug("onTimeout", .{}); } pub fn onDrain(this: *PostgresSQLConnection) void { + var vm = this.globalObject.bunVM(); + defer vm.drainMicrotasks(); this.flushData(); } pub fn onData(this: *PostgresSQLConnection, data: []const u8) void { + var vm = this.globalObject.bunVM(); + defer vm.drainMicrotasks(); if (this.read_buffer.remaining().len == 0) { var consumed: usize = 0; var offset: usize = 0; @@ -2615,6 +2643,7 @@ pub const PostgresSQLConnection = struct { this.setStatus(.connected); this.is_ready_for_query = true; + this.socket.setTimeout(300); this.flushData(); }, @@ -2637,7 +2666,8 @@ pub const PostgresSQLConnection = struct { }, .ParseComplete => { try reader.eatMessage(protocol.ParseComplete); - _ = this.current() orelse return error.ExpectedRequest; + var request = this.current() orelse return error.ExpectedRequest; + _ = request; }, .ParameterDescription => { var description: protocol.ParameterDescription = undefined; |