diff options
author | 2023-10-11 01:59:09 -0700 | |
---|---|---|
committer | 2023-10-13 18:08:24 -0700 | |
commit | 9bc76f6628239f427c7b057a23a73e366fc0d509 (patch) | |
tree | c63a1c2a0d6ae31ef6706e7a305508be25e09a8f /src/sql/postgres.zig | |
parent | 0a6ef179f89b71492458134aae4af9cd32b1e70d (diff) | |
download | bun-9bc76f6628239f427c7b057a23a73e366fc0d509.tar.gz bun-9bc76f6628239f427c7b057a23a73e366fc0d509.tar.zst bun-9bc76f6628239f427c7b057a23a73e366fc0d509.zip |
wip
Diffstat (limited to 'src/sql/postgres.zig')
-rw-r--r-- | src/sql/postgres.zig | 280 |
1 files changed, 239 insertions, 41 deletions
diff --git a/src/sql/postgres.zig b/src/sql/postgres.zig index 37bc47b21..abc9b4e51 100644 --- a/src/sql/postgres.zig +++ b/src/sql/postgres.zig @@ -42,21 +42,58 @@ const Data = union(enum) { }; pub const protocol = struct { - pub fn NewWriterWrap(comptime Context: type, comptime writeFunction_: (fn (ctx: Context, bytes: []const u8) anyerror!void)) type { + pub const ArrayList = struct { + array: *std.ArrayList(u8), + + pub fn offset(this: @This()) usize { + return this.array.items.len; + } + + pub fn write(this: @This(), bytes: []const u8) anyerror!void { + try this.array.appendSlice(bytes); + } + + pub fn pwrite(this: @This(), bytes: []const u8, i: usize) anyerror!void { + @memcpy(this.array.items[i..][0..bytes.len], bytes); + } + + pub const Writer = NewWriter(@This()); + }; + + pub fn NewWriterWrap( + comptime Context: type, + comptime offsetFn_: (fn (ctx: Context) usize), + comptime writeFunction_: (fn (ctx: Context, bytes: []const u8) anyerror!void), + comptime pwriteFunction_: (fn (ctx: Context, bytes: []const u8, offset: usize) anyerror!void), + ) type { return struct { wrapped: Context, const writeFn = writeFunction_; + const pwriteFn = pwriteFunction_; + const offsetFn = offsetFn_; pub const Ctx = Context; pub inline fn write(this: @This(), data: []const u8) anyerror!void { try writeFn(this.wrapped, data); } + pub inline fn offset(this: @This()) usize { + return offsetFn(this.wrapped); + } + + pub inline fn pwrite(this: @This(), data: []const u8, i: usize) anyerror!void { + try pwriteFn(this.wrapped, data, i); + } + pub fn @"i32"(this: @This(), value: i32) !void { try this.write(std.mem.asBytes(&@byteSwap(value))); } + pub fn @"f64"(this: @This(), value: f64) !void { + try this.write(std.mem.asBytes(&@byteSwap(value))); + } + pub fn @"i16"(this: @This(), value: i16) !void { try this.write(std.mem.asBytes(&@byteSwap(value))); } @@ -73,7 +110,20 @@ pub const protocol = struct { try this.write(&[_]u8{0}); } + pub fn boolean(this: @This(), value: bool) !void { + try this.write(if (value) "t" else "f"); + } + + pub fn @"null"(this: @This()) !void { + try this.i32(0xFFFFFFFF); + } + pub fn String(this: @This(), value: bun.String) !void { + if (value.isEmpty()) { + try this.write(&[_]u8{0}); + return; + } + var sliced = value.toUTF8(bun.default_allocator); defer sliced.deinit(); var slice = sliced.slice(); @@ -276,7 +326,7 @@ pub const protocol = struct { } pub fn NewWriter(comptime Context: type) type { - return NewWriterWrap(Context, Context.writeData); + return NewWriterWrap(Context, Context.offset, Context.write, Context.pwrite); } comptime { @@ -1306,10 +1356,176 @@ const PreparedStatementsMap = std.HashMapUnmanaged(u64, *PostgresSQLStatement, b pub const PostgresSQLContext = struct { tcp: ?*uws.SocketContext = null, - ssl: ?*uws.SocketContext = null, - onQueryCompleteFn: JSC.Strong = .{}, - onStatementCompleteFn: JSC.Strong = .{}, + onQueryResolveFn: JSC.Strong = .{}, + onQueryRejectFn: JSC.Strong = .{}, +}; + +pub const PostgresSQLQuery = struct { + statement: ?*PostgresSQLStatement = null, + query: bun.String = bun.String.empty, + cursor_name: bun.String = bun.String.empty, + + pub fn constructor(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) ?*PostgresSQLQuery { + _ = callframe; + _ = globalThis; + } + + pub fn doRun(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + var arguments_ = callframe.arguments(3); + const arguments = arguments_.slice(); + var connection = arguments[0].as(PostgresSQLConnection) orelse { + globalObject.throwTypeError("connection must be a PostgresSQLConnection"); + return .zero; + }; + _ = connection; + + var values_array = arguments[1]; + + var query = arguments[1]; + + if (!values_array.jsType().isArray()) { + globalObject.throwInvalidArgumentType("run", "values", "Array"); + return .zero; + } + if (!query.isString()) { + globalObject.throwInvalidArgumentType("run", "query", "string"); + return .zero; + } + + var statement = this.statement orelse {}; + _ = statement; + + return .undefined; + } + + pub fn doCancel(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + _ = callframe; + _ = globalObject; + _ = this; + + return .undefined; + } +}; + +pub const PostgresRequest = struct { + pub fn writeBind( + statement: *PostgresSQLStatement, + cursor_name: bun.String, + globalObject: *JSC.JSGlobalObject, + values_array: JSC.JSValue, + comptime Context: type, + writer: protocol.NewWriter(Context), + ) !void { + try writer.bytes("B"); + const length_offset = writer.offset(); + try writer.i32(0); + + try writer.string(statement.name); + try writer.String(cursor_name); + + var iter = JSC.JSArrayIterator.init(values_array, globalObject); + + try writer.i16(@truncate(iter.len)); + + while (iter.next()) |value| { + if (value.isUndefinedOrNull()) { + try writer.i16(0); + continue; + } + + const tag = types.Tag.fromJS(globalObject, value); + switch (tag) { + .bytea, .number => { + try writer.i16(0); + }, + else => { + try writer.i16(1); + }, + } + } + + try writer.i16(@truncate(iter.len)); + + iter = JSC.JSArrayIterator.init(values_array, globalObject); + + while (iter.next()) |value| { + if (value.isUndefinedOrNull()) { + try writer.i32(4); + try writer.null(); + continue; + } + + const tag = types.Tag.fromJS(globalObject, value); + switch (tag) { + .number => { + if (value.isInt32()) { + try writer.i32(4); + try writer.i32(value.to(i32)); + } else { + try writer.i32(8); + try writer.f64(value.coerceToDouble(globalObject)); + } + }, + .json => { + var str = bun.String.empty; + value.jsonStringify(globalObject, 0, &str); + try writer.String(str); + }, + .boolean => { + try writer.boolean(value.toBoolean()); + try writer.write(&[_]u8{0}); + }, + .time, .datetime, .date => { + var buf = std.mem.zeroes([28]u8); + const str = value.toISOString(globalObject, &buf); + try writer.string(str); + }, + .bytea => { + var bytes: []const u8 = ""; + if (value.asArrayBuffer(globalObject)) |buf| { + bytes = buf.byteSlice(); + } + try writer.i32(@truncate(bytes.len)); + try writer.bytes(bytes); + }, + else => { + // TODO: check if this leaks + var str = value.toBunString(globalObject); + try writer.str(str); + }, + } + } + + try writer.pwrite(&@byteSwap(std.mem.toBytes(@as(i32, @intCast(writer.offset())))), length_offset); + } + + pub fn writeQuery( + query: bun.String, + statement: *PostgresSQLStatement, + comptime Context: type, + writer: protocol.NewWriter(Context), + ) !void { + { + var query_str = query.toUTF8(bun.default_allocator); + defer query_str.deinit(); + var q = protocol.Query{ + .message = Data{ + .temporary = query_str.slice(), + }, + }; + try q.writeInternal(Context, writer); + } + + { + var d = protocol.Describe{ + .p = .{ + .prepared_statement = statement.name, + }, + }; + try d.writeInternal(Context, writer); + } + } }; pub const PostgresSQLConnection = struct { @@ -1318,6 +1534,7 @@ pub const PostgresSQLConnection = struct { ref_count: u32 = 0, write_buffer: bun.ByteList = .{}, + requests: PostgresRequest.Queue, poll_ref: bun.JSC.PollRef = .{}, globalObject: *JSC.JSGlobalObject, @@ -1326,6 +1543,8 @@ pub const PostgresSQLConnection = struct { has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), js_value: JSC.JSValue = JSC.JSValue.undefined, + pub usingnamespace JSC.Codegen.JSPostgresSQLConnection; + pub fn hasPendingActivity(this: *PostgresSQLConnection) callconv(.C) bool { @fence(.Acquire); return this.has_pending_activity.load(.Acquire); @@ -1389,29 +1608,17 @@ pub const PostgresSQLConnection = struct { }; pub const PostgresSQLStatement = struct { - connection: ?*PostgresSQLConnection = null, + structure: JSC.Strong = .{}, name: []const u8 = "", ref_count: u32 = 0, - poll_ref: JSC.PollRef = .{}, - has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), - - pub fn hasPendingActivity(this: *PostgresSQLStatement) callconv(.C) bool { - return this.has_pending_activity.load(.Monotonic); - } + fields: []const protocol.FieldDescription = &[_]protocol.FieldDescription{}, - pub fn deinit(this: *PostgresSQLStatement) void { - if (this.connection) |conn| conn.deref(); - this.poll_ref.deactivate(JSC.VirtualMachine.get().event_loop_handle.?); - - bun.default_allocator.free(this.name); - bun.default_allocator.destroy(this); - } - - pub fn finalize(this: *PostgresSQLStatement) callconv(.C) void { - this.unref(); + pub fn ref(this: *@This()) void { + std.debug.assert(this.ref_count > 0); + this.ref_count += 1; } - pub fn unref(this: *PostgresSQLStatement) void { + pub fn deref(this: *@This()) void { const ref_count = this.ref_count; this.ref_count -= 1; @@ -1420,28 +1627,19 @@ pub const PostgresSQLStatement = struct { } } - pub fn doRef(this: *PostgresSQLStatement, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) void { - _ = callframe; - this.poll_ref.ref(globalObject.bunVM()); - } + pub fn deinit(this: *PostgresSQLStatement) void { + std.debug.assert(this.ref_count == 0); - pub fn doUnref(this: *PostgresSQLStatement, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) void { - _ = callframe; - this.poll_ref.unref(globalObject.bunVM()); + for (this.fields) |*field| { + field.deinit(); + } + bun.default_allocator.free(this.fields); + this.structure.deinit(); + bun.default_allocator.free(this.name); + bun.default_allocator.destroy(this); } }; -pub const Host = struct { - hostname: []const u8, - port: u16, - - database: []const u8, - user: []const u8, - password: []const u8 = "", - - ssl: bool = false, -}; - pub const Status = enum { disconnected, connecting, |