diff options
author | 2023-10-14 03:06:21 -0700 | |
---|---|---|
committer | 2023-10-14 03:06:21 -0700 | |
commit | 7f5eddc09668592c754d6aad12015b8144d15a76 (patch) | |
tree | b5541e4dfb86f765151a18be37d04622714f1bca | |
parent | 9bc76f6628239f427c7b057a23a73e366fc0d509 (diff) | |
download | bun-7f5eddc09668592c754d6aad12015b8144d15a76.tar.gz bun-7f5eddc09668592c754d6aad12015b8144d15a76.tar.zst bun-7f5eddc09668592c754d6aad12015b8144d15a76.zip |
More progress
-rw-r--r-- | src/bun.js/api/postgres.classes.ts | 2 | ||||
-rw-r--r-- | src/bun.js/bindings/bindings.zig | 19 | ||||
-rw-r--r-- | src/bun.js/bindings/structure.cpp | 34 | ||||
-rw-r--r-- | src/sql/postgres.zig | 386 |
4 files changed, 405 insertions, 36 deletions
diff --git a/src/bun.js/api/postgres.classes.ts b/src/bun.js/api/postgres.classes.ts index 1423e0da1..671ecfd6f 100644 --- a/src/bun.js/api/postgres.classes.ts +++ b/src/bun.js/api/postgres.classes.ts @@ -54,7 +54,7 @@ export default [ length: 0, }, }, - values: ["instance"], + values: ["pendingValue"], estimatedSize: true, }), ]; diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig index 278ccd20e..60fbac21c 100644 --- a/src/bun.js/bindings/bindings.zig +++ b/src/bun.js/bindings/bindings.zig @@ -45,6 +45,25 @@ pub const JSObject = extern struct { }); } + extern fn JSC__createStructure(*JSC.JSGlobalObject, u32, names: [*]bun.String) JSC.JSValue; + extern fn JSC__createEmptyObjectWithStructure(*JSC.JSGlobalObject, *anyopaque) JSC.JSValue; + extern fn JSC__putDirectOffset(*JSC.VM, JSC.JSValue, offset: u32, JSC.JSValue) void; + + pub fn createStructure(global: *JSGlobalObject, length: u32, names: [*]bun.String) JSValue { + JSC.markBinding(@src()); + return JSC__createStructure(global, length, names); + } + + pub fn uninitialized(global: *JSGlobalObject, structure: JSC.JSValue) JSValue { + JSC.markBinding(@src()); + return JSC__createEmptyObjectWithStructure(global, structure.asCell()); + } + + pub fn putDirectOffset(this: JSValue, vm: *VM, offset: u32, value: JSValue) void { + JSC.markBinding(@src()); + return JSC__putDirectOffset(vm, this, offset, value); + } + pub fn Initializer(comptime Ctx: type, comptime func: fn (*Ctx, obj: *JSObject, global: *JSGlobalObject) void) type { return struct { pub fn call(this: ?*anyopaque, obj: [*c]JSObject, global: [*c]JSGlobalObject) callconv(.C) void { diff --git a/src/bun.js/bindings/structure.cpp b/src/bun.js/bindings/structure.cpp new file mode 100644 index 000000000..9e1fb6dc2 --- /dev/null +++ b/src/bun.js/bindings/structure.cpp @@ -0,0 +1,34 @@ +#include "root.h" +#include <JavaScriptCore/StructureInlines.h> +#include <JavaScriptCore/ObjectPrototype.h> +#include "headers-handwritten.h" +#include <JavaScriptCore/JSCJSValueInlines.h> +#include <JavaScriptCore/ObjectConstructor.h> + +namespace Bun { +using namespace JSC; +extern "C" EncodedJSValue JSC__createStructure(JSC::JSGlobalObject* globalObject, unsigned int inlineCapacity, BunString* names) +{ + auto& vm = globalObject->vm(); + JSC::Structure* structure = JSC::Structure::create(vm, globalObject, globalObject->objectPrototype(), JSC::TypeInfo(ObjectType, JSC::JSObject::StructureFlags), JSC::JSFinalObject::info(), inlineCapacity); + PropertyOffset offset = 0; + for (unsigned i = 0; i < inlineCapacity; i++) { + const Identifier& ident = JSC::Identifier::fromString(vm, Bun::toWTFString(names[i])); + structure = structure->addPropertyTransition(vm, structure, JSC::PropertyName(ident), 0, offset); + } + + return JSValue::encode(structure); +} + +extern "C" EncodedJSValue JSC__createEmptyObjectWithStructure(JSC::JSGlobalObject* globalObject, JSC::Structure* structure) +{ + auto& vm = globalObject->vm(); + return JSValue::encode(JSC::constructEmptyObject(vm, structure)); +} + +extern "C" void JSC__putDirectOffset(JSC::VM* vm, JSC::EncodedJSValue object, unsigned int offset, JSC::EncodedJSValue value) +{ + JSValue::decode(object).getObject()->putDirectOffset(*vm, offset, JSValue::decode(value)); +} + +} diff --git a/src/sql/postgres.zig b/src/sql/postgres.zig index abc9b4e51..452457826 100644 --- a/src/sql/postgres.zig +++ b/src/sql/postgres.zig @@ -3,6 +3,7 @@ const JSC = bun.JSC; const String = bun.String; const uws = bun.uws; const std = @import("std"); +const debug = bun.Output.scoped(.Postgres, true); const Data = union(enum) { owned: bun.ByteList, temporary: []const u8, @@ -273,6 +274,12 @@ pub const protocol = struct { return try readFn(this.wrapped, count); } + pub inline fn eatMessage(this: @This(), comptime msg: []const u8) anyerror!void { + var input = try readFn(this.wrapped, msg.len); + defer input.deinit(); + if (bun.strings.eqlLong(input.slice(), msg)) return; + } + pub inline fn readZ(this: @This()) anyerror!Data { return try readZFn(this.wrapped); } @@ -592,9 +599,9 @@ pub const protocol = struct { return error.InvalidMessageLength; } - const status = try reader.bytes(1); + const status = try reader.int(u8); this.* = .{ - .status = @enumFromInt(status[0]), + .status = @enumFromInt(status), }; } @@ -615,7 +622,7 @@ pub const protocol = struct { }; pub const DataRow = struct { - pub fn decode(context: anytype, reader: NewReader(@TypeOf(context)), comptime forEach: fn (@TypeOf(context), index: i16, bytes: ?*Data) anyerror!bool) anyerror!void { + pub fn decode(context: anyopaque, comptime ContextType: type, reader: NewReader(ContextType), comptime forEach: fn (@TypeOf(context), index: i16, bytes: ?*Data) anyerror!bool) anyerror!void { var remaining_bytes = try reader.length(); remaining_bytes -|= 4; @@ -648,10 +655,7 @@ pub const protocol = struct { name: Data = .{ .empty = {} }, table_oid: i32 = 0, column_index: i16 = 0, - type_oid: i32 = 0, - type_size: i16 = 0, - type_modifier: i32 = 0, - format_code: FormatCode = FormatCode.binary, + type_oid: i16 = 0, pub fn deinit(this: *@This()) void { this.name.deinit(); @@ -665,12 +669,11 @@ pub const protocol = struct { this.* = .{ .table_oid = try reader.i32(), .column_index = try reader.i16(), - .type_oid = try reader.i32(), - .type_size = try reader.i16(), - .type_modifier = try reader.i32(), - .format_code = try FormatCode.from(try reader.i16()), + .type_oid = @truncate(try reader.i32()), .name = try name.toOwned(), }; + + try reader.skip(12); } pub const decode = decoderWrap(FieldDescription, decodeInternal).decode; @@ -771,16 +774,12 @@ pub const protocol = struct { }; pub const Parse = struct { - name: bun.String = bun.String.empty, - query: bun.String = bun.String.empty, - - /// Object IDs - paramters: std.ArrayListUnmanaged(i32) = .{}, + name: []const u8 = "", + query: []const u8 = "", + params: []const i32 = &.{}, pub fn deinit(this: *Parse) void { - this.name.deref(); - this.query.deref(); - this.paramters.deinit(bun.default_allocator); + _ = this; } pub fn writeInternal( @@ -788,7 +787,7 @@ pub const protocol = struct { comptime Context: type, writer: NewWriter(Context), ) !void { - const parameters = this.paramters.items; + const parameters = this.params; const count: usize = @sizeOf((u32)) + @sizeOf(u16) + (parameters.len * @sizeOf(u32)); const header = [_]u8{ 'P', @@ -1344,6 +1343,7 @@ pub const types = struct { value: *Data, ) anyerror!JSC.JSValue { defer value.deinit(); + // var slice = value.slice()[@min(1, value.len)..]; // _ = slice; return JSC.JSValue.createBuffer(globalObject, value.slice(), null); @@ -1365,10 +1365,13 @@ pub const PostgresSQLQuery = struct { statement: ?*PostgresSQLStatement = null, query: bun.String = bun.String.empty, cursor_name: bun.String = bun.String.empty, + thisValue: JSC.JSValue = .undefined, pub fn constructor(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) ?*PostgresSQLQuery { - _ = callframe; _ = globalThis; + const args_ = callframe.arguments(3); + const args = args_.slice(); + _ = args; } pub fn doRun(this: *PostgresSQLQuery, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { @@ -1410,7 +1413,7 @@ pub const PostgresSQLQuery = struct { pub const PostgresRequest = struct { pub fn writeBind( - statement: *PostgresSQLStatement, + name: []const u8, cursor_name: bun.String, globalObject: *JSC.JSGlobalObject, values_array: JSC.JSValue, @@ -1421,7 +1424,7 @@ pub const PostgresRequest = struct { const length_offset = writer.offset(); try writer.i32(0); - try writer.string(statement.name); + try writer.string(name); try writer.String(cursor_name); var iter = JSC.JSArrayIterator.init(values_array, globalObject); @@ -1449,8 +1452,11 @@ pub const PostgresRequest = struct { iter = JSC.JSArrayIterator.init(values_array, globalObject); + debug("Bind: {s} ({d})", .{ name, values_array, iter.len }); + while (iter.next()) |value| { if (value.isUndefinedOrNull()) { + debug(" -> NULL", .{}); try writer.i32(4); try writer.null(); continue; @@ -1459,6 +1465,7 @@ pub const PostgresRequest = struct { const tag = types.Tag.fromJS(globalObject, value); switch (tag) { .number => { + debug(" -> {s}", .{@tagName(tag)}); if (value.isInt32()) { try writer.i32(4); try writer.i32(value.to(i32)); @@ -1468,15 +1475,18 @@ pub const PostgresRequest = struct { } }, .json => { + debug(" -> {s}", .{@tagName(tag)}); var str = bun.String.empty; value.jsonStringify(globalObject, 0, &str); try writer.String(str); }, .boolean => { + debug(" -> {s}", .{@tagName(tag)}); try writer.boolean(value.toBoolean()); try writer.write(&[_]u8{0}); }, .time, .datetime, .date => { + debug(" -> {s}", .{@tagName(tag)}); var buf = std.mem.zeroes([28]u8); const str = value.toISOString(globalObject, &buf); try writer.string(str); @@ -1487,9 +1497,12 @@ pub const PostgresRequest = struct { bytes = buf.byteSlice(); } try writer.i32(@truncate(bytes.len)); + debug(" -> {s}: {d}", .{ @tagName(tag), bytes.len }); + try writer.bytes(bytes); }, else => { + debug(" -> string", .{}); // TODO: check if this leaks var str = value.toBunString(globalObject); try writer.str(str); @@ -1501,31 +1514,124 @@ pub const PostgresRequest = struct { } pub fn writeQuery( - query: bun.String, - statement: *PostgresSQLStatement, + query: []const u8, + name: []const u8, + params: []const i32, comptime Context: type, writer: protocol.NewWriter(Context), ) !void { { var query_str = query.toUTF8(bun.default_allocator); defer query_str.deinit(); - var q = protocol.Query{ - .message = Data{ - .temporary = query_str.slice(), - }, + var q = protocol.Parse{ + .name = name, + .paramters = params, + .query = query, }; try q.writeInternal(Context, writer); + debug("Parse: {s}", .{query_str}); } { var d = protocol.Describe{ .p = .{ - .prepared_statement = statement.name, + .prepared_statement = name, }, }; try d.writeInternal(Context, writer); + debug("Describe", .{}); } } + + pub fn prepareAndQuery( + globalObject: *JSC.JSGlobalObject, + query: bun.String, + array_value: JSC.JSValue, + comptime Context: type, + writer: protocol.NewWriter(Context), + ) !Signature { + var query_ = query.toUTF8(bun.default_allocator); + defer query_.deinit(); + var signature = try Signature.generate(globalObject, query_, array_value); + errdefer { + signature.deinit(); + } + + try writeQuery(query, signature.name, signature.fields, Context, writer); + try writeBind(signature.name, bun.String.empty, globalObject, array_value, Context, writer); + var exec = protocol.Execute{ + .p = .{ + .prepared_statement = signature.name, + }, + }; + try exec.writeInternal(Context, writer); + + try writer.write(protocol.Flush); + + return signature; + } + + pub fn bindAndExecute( + globalObject: *JSC.JSGlobalObject, + statement: *PostgresSQLStatement, + array_value: JSC.JSValue, + comptime Context: type, + writer: protocol.NewWriter(Context), + ) !void { + try writeBind(statement.signature.name, bun.String.empty, globalObject, array_value, Context, writer); + var exec = protocol.Execute{ + .p = .{ + .prepared_statement = statement.signature.name, + }, + }; + try exec.writeInternal(Context, writer); + } + + pub fn onData( + connection: *PostgresSQLConnection, + comptime Context: type, + reader: protocol.NewReader(Context), + ) !void { + while (!reader.isEmpty()) { + switch (try reader.int(u8)) { + 'D' => try connection.on(.DataRow, Context, reader), + 'd' => try connection.on(.CopyData, Context, reader), + 'S' => try connection.on(.ParameterStatus, Context), + 'Z' => try connection.on(.ReadyForQuery, Context, reader), + 'C' => try connection.on(.CommandComplete, Context, reader), + '2' => try connection.on(.BindComplete, Context, reader), + '1' => try connection.on(.ParseComplete, Context, reader), + 't' => try connection.on(.ParameterDescription, Context, reader), + 'T' => try connection.on(.RowDescription, Context, reader), + 'R' => try connection.on(.Authentication, Context, reader), + 'n' => try connection.on(.NoData, Context, reader), + 'K' => try connection.on(.BackendKeyData, Context, reader), + 'E' => try connection.on(.ErrorResponse, Context, reader), + 's' => try connection.on(.PortalSuspended, Context, reader), + '3' => try connection.on(.CloseComplete, Context, reader), + 'G' => try connection.on(.CopyInResponse, Context, reader), + 'N' => try connection.on(.NoticeResponse, Context, reader), + 'I' => try connection.on(.EmptyQueryResponse, Context, reader), + 'H' => try connection.on(.CopyOutResponse, Context, reader), + 'c' => try connection.on(.CopyDone, Context, reader), + 'W' => try connection.on(.CopyBothResponse, Context, reader), + + 'A' => try reader.notSupported("NotificationResponse"), + 'V' => try reader.notSupported("FunctionCallResponse"), + 'v' => try reader.notSupported("NegotiateProtocolVersion"), + + else => |c| { + debug("Unknown message: {d}", .{c}); + }, + } + } + } + + pub const Operation = union(enum) { + Query: *PostgresSQLQuery, + }; + + pub const Queue = std.fifo.LinearFifo(Operation, .Dynamic); }; pub const PostgresSQLConnection = struct { @@ -1534,6 +1640,7 @@ pub const PostgresSQLConnection = struct { ref_count: u32 = 0, write_buffer: bun.ByteList = .{}, + read_buffer: bun.ByteList = .{}, requests: PostgresRequest.Queue, poll_ref: bun.JSC.PollRef = .{}, @@ -1605,14 +1712,136 @@ pub const PostgresSQLConnection = struct { this.socket.close(0, null); } } + + fn current(this: *PostgresSQLConnection) ?*PostgresSQLQuery { + if (this.requests.readableLength() == 0) { + return null; + } + + return this.requests.peekItem(0); + } + + fn advance(this: *PostgresSQLConnection) void { + _ = this.requests.readItem(); + } + + const CellPutter = struct { + object: JSC.JSValue, + vm: *JSC.VM, + globalObject: *JSC.JSGlobalObject, + fields: []const protocol.FieldDescription, + + pub fn put(this: *const CellPutter, index_: i16, optional_bytes: ?*Data) anyerror!bool { + const index: u32 = @intCast(index_); + + const putDirectIndex = JSC.JSObject.putDirectIndex; + var bytes_ = optional_bytes orelse { + putDirectIndex(this.vm, this.object, index, JSC.JSValue.jsNull()); + return true; + }; + defer bytes_.deinit(); + const bytes = bytes_.slice(); + + switch (@as(types.Tag, @enumFromInt(this.fields[index].type_oid))) { + .number => { + switch (bytes.len) { + 0 => { + putDirectIndex(this.vm, index, JSC.JSValue.jsNull()); + }, + 2 => { + putDirectIndex(this.vm, index, JSC.JSValue.jsNumber(@bitCast(@as(i16, @bitCast(bytes[0..2]))))); + }, + 4 => { + putDirectIndex(this.vm, index, JSC.JSValue.jsNumber(@bitCast(@as(i32, @bitCast(bytes[0..4]))))); + }, + else => { + var eight: usize = 0; + @memcpy(@as(*[8]u8, @ptrCast(&eight))[0..bytes.len], bytes[0..@min(8, bytes.len)]); + eight = @byteSwap(eight); + putDirectIndex(this.vm, index, JSC.JSValue.jsNumber(@bitCast(@as(f64, eight)))); + }, + } + }, + .json => { + var str = bun.String.fromUTF8(bytes.slice()); + defer str.deref(); + putDirectIndex(this.vm, index, str.toJSForParseJSON(this.globalObject)); + }, + .boolean => { + putDirectIndex(this.vm, index, JSC.JSValue.jsBoolean(bytes.len > 0 and bytes[0] == 't')); + }, + .time, .datetime, .date => { + putDirectIndex(this.vm, index, JSC.JSValue.fromDateString(bytes_.sliceZ())); + }, + .bytea => { + putDirectIndex(this.vm, index, JSC.JSValue.createBuffer(this.globalObject, bytes, null)); + }, + else => { + var str = bun.String.fromUTF8(bytes.slice()); + defer str.deref(); + putDirectIndex(this.vm, index, str.toJS(this.globalObject)); + }, + } + return true; + } + }; + + pub fn on(this: *PostgresSQLConnection, comptime MessageType: @Type(.EnumLiteral), comptime Context: type, reader: protocol.NewReader(Context)) !void { + debug("on({s})", .{@typeName(MessageType)}); + + switch (comptime MessageType) { + .DataRow => { + var request = this.current() orelse return error.ExpectedRequest; + var statement = request.statement orelse return error.ExpectedStatement; + var structure = statement.structure(this.globalObject); + std.debug.assert(!structure.isEmptyOrUndefinedOrNull()); + + var row = JSC.JSObject.uninitialized(structure.asCell(), this.globalObject); + var putter = CellPutter{ + .object = row, + .vm = this.globalObject.vm(), + .globalObject = this.globalObject, + .fields = statement.fields, + }; + try protocol.DataRow.decode( + &putter, + Context, + reader, + CellPutter.put, + ); + request.push(row); + }, + .CopyData => { + + }, + .ParameterStatus => {}, + .ReadyForQuery => {}, + .CommandComplete => {}, + .BindComplete => {}, + .ParseComplete => {}, + .ParameterDescription => {}, + .RowDescription => {}, + .Authentication => {}, + .NoData => {}, + .BackendKeyData => {}, + .ErrorResponse => {}, + .PortalSuspended => {}, + .CloseComplete => {}, + .CopyInResponse => {}, + .NoticeResponse => {}, + .EmptyQueryResponse => {}, + .CopyOutResponse => {}, + .CopyDone => {}, + .CopyBothResponse => {}, + } + } }; pub const PostgresSQLStatement = struct { - structure: JSC.Strong = .{}, - name: []const u8 = "", + cached_structure: JSC.Strong = .{}, ref_count: u32 = 0, fields: []const protocol.FieldDescription = &[_]protocol.FieldDescription{}, - + signature: Signature, pub fn ref(this: *@This()) void { std.debug.assert(this.ref_count > 0); this.ref_count += 1; @@ -1634,10 +1863,37 @@ pub const PostgresSQLStatement = struct { field.deinit(); } bun.default_allocator.free(this.fields); - this.structure.deinit(); - bun.default_allocator.free(this.name); + this.cached_structure.deinit(); + this.signature.deinit(); bun.default_allocator.destroy(this); } + + pub fn structure(this: *PostgresSQLStatement, globalObject: *JSC.JSGlobalObject) JSC.JSValue { + return this.cached_structure.get() orelse { + var names = bun.default_allocator.alloc(bun.String, this.fields.len) catch return .undefined; + defer { + for (names) |*name| { + name.deref(); + } + bun.default_allocator.free(names); + } + for (this.fields, names) |*field, *name| { + name.* = String.createAtomIfPossible(field.name); + } + var structure_ = JSC.JSObject.createStructure( + globalObject, + @truncate(this.fields.len), + names.ptr, + ); + this.cached_structure.set(globalObject, structure_); + return structure_; + }; + } + + pub const Row = struct { + object: JSC.JSValue, + statement: *PostgresSQLStatement, + }; }; pub const Status = enum { @@ -1646,3 +1902,63 @@ pub const Status = enum { connected, failed, }; + +const Signature = struct { + fields: []const i32, + name: []const u8, + query: []const u8, + + pub fn deinit(this: *Signature) void { + bun.default_allocator.free(this.fields); + bun.default_allocator.free(this.name); + bun.default_allocator.free(this.query); + } + + pub fn hash(this: *const Signature) u64 { + var hasher = std.hash.Wyhash.init(0); + defer hasher.deinit(); + hasher.update(this.name); + hasher.update(std.mem.sliceAsBytes(this.fields)); + return hasher.final(); + } + + pub fn generate(globalObject: *JSC.JSGlobalObject, query: []const u8, array_value: JSC.JSValue) !Signature { + var fields = std.ArrayList(i32).init(bun.default_allocator); + var name = try std.ArrayList(u8).initCapacity(bun.default_allocator, query.len); + + errdefer { + fields.deinit(); + name.deinit(); + } + + var iter = JSC.JSArrayIterator.init(array_value, globalObject); + + while (iter.next()) |value| { + if (value.isUndefinedOrNull()) { + try fields.append(@byteSwap(-1)); + try name.append(".null"); + continue; + } + + const tag = types.Tag.fromJS(globalObject, value); + try fields.append(@byteSwap(@intFromEnum(tag))); + switch (tag) { + .number => try name.append(".number"), + .json => try name.append(".json"), + .boolean => try name.append(".boolean"), + .date => try name.append(".date"), + .datetime => try name.append(".datetime"), + .time => try name.append(".time"), + .bytea => try name.append(".bytea"), + .bigint => try name.append(".bigint"), + else => try name.append(".string"), + } + } + + return Signature{ + .name = name.items, + .fields = fields.items, + .query = try bun.default_allocator.dupe(u8, query), + }; + } +}; |