aboutsummaryrefslogtreecommitdiff
path: root/src/sql/postgres.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/sql/postgres.zig')
-rw-r--r--src/sql/postgres.zig112
1 files changed, 71 insertions, 41 deletions
diff --git a/src/sql/postgres.zig b/src/sql/postgres.zig
index 7c561cf7e..039452d5c 100644
--- a/src/sql/postgres.zig
+++ b/src/sql/postgres.zig
@@ -1633,14 +1633,8 @@ pub const PostgresSQLQuery = struct {
this.deref();
var vm = JSC.VirtualMachine.get();
- // TODO: error handling
- _ = vm.rareData().postgresql_context.onQueryResolveFn.get().?.callWithThis(
- globalObject,
- targetValue,
- &[_]JSC.JSValue{
- JSC.JSValue.undefined,
- },
- );
+ const function = vm.rareData().postgresql_context.onQueryResolveFn.get().?;
+ globalObject.queueMicrotask(function, &[_]JSC.JSValue{ targetValue, JSC.JSValue.undefined });
}
pub fn onError(this: *@This(), err: protocol.ErrorResponse, globalObject: *JSC.JSGlobalObject) void {
const thisValue = this.thisValue;
@@ -1671,13 +1665,9 @@ pub const PostgresSQLQuery = struct {
this.deref();
// TODO: error handling
- _ = JSC.VirtualMachine.get().rareData().postgresql_context.onQueryRejectFn.get().?.callWithThis(
- globalObject,
- targetValue,
- &[_]JSC.JSValue{
- instance,
- },
- );
+ var vm = JSC.VirtualMachine.get();
+ const function = vm.rareData().postgresql_context.onQueryRejectFn.get().?;
+ globalObject.queueMicrotask(function, &[_]JSC.JSValue{ targetValue, instance });
}
pub fn onSuccess(this: *@This(), _: []const u8, globalObject: *JSC.JSGlobalObject) void {
@@ -1692,14 +1682,9 @@ pub const PostgresSQLQuery = struct {
this.status = .success;
this.deref();
- // TODO: error handling
- _ = JSC.VirtualMachine.get().rareData().postgresql_context.onQueryResolveFn.get().?.callWithThis(
- globalObject,
- targetValue,
- &[_]JSC.JSValue{
- pending_value,
- },
- );
+ var vm = JSC.VirtualMachine.get();
+ const function = vm.rareData().postgresql_context.onQueryResolveFn.get().?;
+ globalObject.queueMicrotask(function, &[_]JSC.JSValue{ targetValue, pending_value });
}
pub fn constructor(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) ?*PostgresSQLQuery {
@@ -1773,18 +1758,38 @@ pub const PostgresSQLQuery = struct {
this.target = query;
this.thisValue = callframe.this();
const binding_value = PostgresSQLQuery.bindingGetCached(callframe.this()) orelse .zero;
+ var query_str = this.query.toUTF8(bun.default_allocator);
+ defer query_str.deinit();
+
+ var signature = Signature.generate(globalObject, query_str.slice(), binding_value) catch |err| {
+ globalObject.throwError(err, "failed to generate signature");
+ return .zero;
+ };
var writer = connection.writer();
- if (this.statement) |stmt| {
- PostgresRequest.bindAndExecute(globalObject, stmt, binding_value, PostgresSQLConnection.Writer, writer) catch |err| {
+
+ var entry = connection.statements.getOrPut(bun.default_allocator, bun.hash(signature.name)) catch |err| {
+ globalObject.throwError(err, "failed to allocate statement");
+ signature.deinit();
+ return .zero;
+ };
+ if (entry.found_existing) {
+ this.statement = entry.value_ptr.*;
+ this.statement.?.ref();
+ signature.deinit();
+
+ PostgresRequest.bindAndExecute(globalObject, this.statement.?, binding_value, PostgresSQLConnection.Writer, writer) catch |err| {
globalObject.throwError(err, "failed to bind and execute query");
+
return .zero;
};
} else {
- const signature = PostgresRequest.prepareAndQuery(globalObject, this.query, binding_value, PostgresSQLConnection.Writer, writer) catch |err| {
- globalObject.throwError(err, "failed to prepare query");
+ PostgresRequest.prepareAndQueryWithSignature(globalObject, query_str.slice(), binding_value, PostgresSQLConnection.Writer, writer, &signature) catch |err| {
+ globalObject.throwError(err, "failed to prepare and query");
+ signature.deinit();
return .zero;
};
+
var stmt = bun.default_allocator.create(PostgresSQLStatement) catch |err| {
globalObject.throwError(err, "failed to allocate statement");
return .zero;
@@ -1792,16 +1797,19 @@ pub const PostgresSQLQuery = struct {
stmt.* = .{
.signature = signature,
+ .ref_count = 2,
};
this.statement = stmt;
+ entry.value_ptr.* = stmt;
}
- connection.flushData();
-
connection.requests.writeItem(this) catch {};
this.ref();
this.status = .running;
+ if (connection.is_ready_for_query)
+ connection.flushData();
+
return .undefined;
}
@@ -1957,6 +1965,27 @@ pub const PostgresRequest = struct {
}
}
+ pub fn prepareAndQueryWithSignature(
+ globalObject: *JSC.JSGlobalObject,
+ query: []const u8,
+ array_value: JSC.JSValue,
+ comptime Context: type,
+ writer: protocol.NewWriter(Context),
+ signature: *Signature,
+ ) !void {
+ try writeQuery(query, signature.name, signature.fields, Context, writer);
+ try writeBind(signature.name, bun.String.empty, globalObject, array_value, Context, writer);
+ var exec = protocol.Execute{
+ .p = .{
+ .prepared_statement = signature.name,
+ },
+ };
+ try exec.writeInternal(Context, writer);
+
+ try writer.write(&protocol.Flush);
+ try writer.write(&protocol.Sync);
+ }
+
pub fn prepareAndQuery(
globalObject: *JSC.JSGlobalObject,
query: bun.String,
@@ -1971,16 +2000,7 @@ pub const PostgresRequest = struct {
signature.deinit();
}
- try writeQuery(query_.slice(), signature.name, signature.fields, Context, writer);
- try writeBind(signature.name, bun.String.empty, globalObject, array_value, Context, writer);
- var exec = protocol.Execute{
- .p = .{
- .prepared_statement = signature.name,
- },
- };
- try exec.writeInternal(Context, writer);
-
- try writer.write(&protocol.Flush);
+ try prepareAndQueryWithSignature(globalObject, query_.slice(), array_value, Context, writer, &signature);
return signature;
}
@@ -2001,6 +2021,7 @@ pub const PostgresRequest = struct {
try exec.writeInternal(Context, writer);
try writer.write(&protocol.Flush);
+ try writer.write(&protocol.Sync);
}
pub fn onData(
@@ -2157,6 +2178,8 @@ pub const PostgresSQLConnection = struct {
}
pub fn onClose(this: *PostgresSQLConnection) void {
+ var vm = this.globalObject.bunVM();
+ defer vm.drainMicrotasks();
this.fail("Connection closed", error.ConnectionClosed);
}
@@ -2176,15 +2199,20 @@ pub const PostgresSQLConnection = struct {
}
pub fn onTimeout(this: *PostgresSQLConnection) void {
- _ = this;
+ var vm = this.globalObject.bunVM();
+ defer vm.drainMicrotasks();
debug("onTimeout", .{});
}
pub fn onDrain(this: *PostgresSQLConnection) void {
+ var vm = this.globalObject.bunVM();
+ defer vm.drainMicrotasks();
this.flushData();
}
pub fn onData(this: *PostgresSQLConnection, data: []const u8) void {
+ var vm = this.globalObject.bunVM();
+ defer vm.drainMicrotasks();
if (this.read_buffer.remaining().len == 0) {
var consumed: usize = 0;
var offset: usize = 0;
@@ -2615,6 +2643,7 @@ pub const PostgresSQLConnection = struct {
this.setStatus(.connected);
this.is_ready_for_query = true;
+ this.socket.setTimeout(300);
this.flushData();
},
@@ -2637,7 +2666,8 @@ pub const PostgresSQLConnection = struct {
},
.ParseComplete => {
try reader.eatMessage(protocol.ParseComplete);
- _ = this.current() orelse return error.ExpectedRequest;
+ var request = this.current() orelse return error.ExpectedRequest;
+ _ = request;
},
.ParameterDescription => {
var description: protocol.ParameterDescription = undefined;