diff options
Diffstat (limited to 'src/bun.js/api/postgres.zig')
-rw-r--r-- | src/bun.js/api/postgres.zig | 425 |
1 files changed, 425 insertions, 0 deletions
diff --git a/src/bun.js/api/postgres.zig b/src/bun.js/api/postgres.zig new file mode 100644 index 000000000..bd45ba7cb --- /dev/null +++ b/src/bun.js/api/postgres.zig @@ -0,0 +1,425 @@ +const Bun = @This(); +const default_allocator = @import("bun").default_allocator; +const bun = @import("bun"); +const Environment = bun.Environment; +const NetworkThread = @import("bun").HTTP.NetworkThread; +const Global = bun.Global; +const strings = bun.strings; +const string = bun.string; +const Output = @import("bun").Output; +const MutableString = @import("bun").MutableString; +const std = @import("std"); +const Allocator = std.mem.Allocator; +const JSC = @import("bun").JSC; +const JSValue = JSC.JSValue; +const JSGlobalObject = JSC.JSGlobalObject; + +const uws = bun.uws; +const Socket = uws.NewSocketHandler(false); +const SocketContext = uws.SocketContext; +const Messages = @import("./postgres_messages.zig"); + +const ErrorCode = enum(i32) { + cancel, + invalid_response, + timeout, + closed, + failed_to_write, + failed_to_connect, + failed_to_allocate_memory, + invalid_utf8, + ended, + unknown, + + pub const status = bun.enumMap(ErrorCode, .{ + .{ .cancel, "cancel" }, + .{ .invalid_response, "invalidResponse" }, + .{ .timeout, "timeout" }, + .{ .closed, "closed" }, + .{ .failed_to_write, "failedToWrite" }, + .{ .failed_to_connect, "failedToConnect" }, + .{ .failed_to_allocate_memory, "failedToAllocateMemory" }, + .{ .invalid_utf8, "invalidUtf8" }, + .{ .ended, "ended" }, + .{ .unknown, "unknown" }, + }); + + pub const code = bun.enumMap(ErrorCode, .{ + .{ .cancel, "POSTGRES_ERROR_CANCEL" }, + .{ .invalid_response, "POSTGRES_ERROR_INVALID_RESPONSE" }, + .{ .timeout, "POSTGRES_ERROR_TIMEOUT" }, + .{ .closed, "POSTGRES_ERROR_CLOSED" }, + .{ .failed_to_write, "POSTGRES_ERROR_FAILED_TO_WRITE" }, + .{ .failed_to_connect, "POSTGRES_ERROR_FAILED_TO_CONNECT" }, + .{ .failed_to_allocate_memory, "POSTGRES_ERROR_FAILED_TO_ALLOCATE_MEMORY" }, + .{ .invalid_utf8, "POSTGRES_ERROR_INVALID_UTF8" }, + .{ .ended, "POSTGRES_ERROR_ENDED" }, + .{ .unknown, "POSTGRES_ERROR_UNKNOWN" }, + }); + + pub const label = bun.enumMap(ErrorCode, .{ + .{ .cancel, "The connection was cancelled" }, + .{ .invalid_response, "The connection has an invalid response" }, + .{ .timeout, "The connection timed out" }, + .{ .closed, "The connection was closed" }, + .{ .failed_to_write, "The connection failed to write" }, + .{ .failed_to_connect, "The connection failed to connect" }, + .{ .failed_to_allocate_memory, "Failed to allocate memory" }, + .{ .invalid_utf8, "Received invalid UTF-8" }, + .{ .ended, "The connection was ended" }, + .{ .unknown, "An unknown error occurred" }, + }); + + pub fn toErrorInstance( + this: ErrorCode, + globalObject: *JSC.JSGlobalObject, + ) JSC.JSValue { + var instance = globalObject.createErrorInstance( + "{s}", + .{this.label()}, + ); + instance.put("code", JSC.ZigString.init(this.code()).toValueGC(globalObject)); + instance.put("name", JSC.ZigString.static("PostgresError").toValueGC(globalObject)); + return instance; + } +}; + +const ConnectionOptions = union(enum) { + pub const TCP = struct { + hostname: []const u8 = "localhost", + port: u16 = 5432, + database: []const u8 = "postgres", + user: []const u8 = "", + password: []const u8 = "", + }; + tcp: TCP, + tls: struct { + tcp: TCP, + }, +}; + +pub const PostgresData = struct { + tcp_ctx: ?*uws.SocketContext = null, +}; + +pub const Protocol = struct {}; + +pub const PostgresConnection = struct { + const log = Output.scoped(.PostgresConnection, false); + + tcp: Socket, + poll_ref: JSC.PollRef = .{}, + + pub fn connect(globalThis: *JSC.JSGlobalObject, db: *PostgresSQLDatabase, options: ConnectionOptions) !void { + autoRegister(globalThis); + log("connect {s}:{d}", .{ options.tcp.hostname, options.tcp.port }); + const socket = Socket.connectAnon( + options.tcp.hostname, + options.tcp.port, + globalThis.bunVM().rareData().postgres_data.tcp_ctx.?, + &db.connection, + ) orelse { + return error.FailedToConnect; + }; + db.connection.tcp = socket; + } + + pub fn closeGracefully(this: *PostgresConnection) void { + this.tcp.close(0, null); // todo + } + + pub fn autoRegister(global: *JSC.JSGlobalObject) void { + var vm = global.bunVM(); + + if (vm.rareData().postgres_data.tcp_ctx == null) { + var opts: uws.us_socket_context_options_t = undefined; + @memset(@ptrCast([*]u8, &opts), 0, @sizeOf(uws.us_socket_context_options_t)); + var ctx = uws.us_create_socket_context(0, vm.uws_event_loop.?, @sizeOf(usize), opts).?; + vm.rareData().postgres_data.tcp_ctx = ctx; + Socket.configure( + ctx, + false, + PostgresConnection, + struct { + pub const onClose = PostgresConnection.onClose; + pub const onData = PostgresConnection.onData; + pub const onWritable = PostgresConnection.onWritable; + pub const onTimeout = PostgresConnection.onTimeout; + pub const onConnectError = PostgresConnection.onConnectError; + pub const onEnd = PostgresConnection.onEnd; + }, + ); + } + } + + pub inline fn database(this: *PostgresConnection) *PostgresSQLDatabase { + return @fieldParentPtr(PostgresSQLDatabase, "connection", this); + } + + pub fn onWritable( + this: *PostgresConnection, + socket: Socket, + ) void { + std.debug.assert(socket.socket == this.tcp.socket); + // if (this.to_send.len == 0) + // return; + + // const wrote = socket.write(this.to_send, true); + // if (wrote < 0) { + // this.terminate(ErrorCode.failed_to_write); + // return; + // } + // this.to_send = this.to_send[@min(@intCast(usize, wrote), this.to_send.len)..]; + } + pub fn onTimeout( + this: *PostgresConnection, + _: Socket, + ) void { + this.terminate(ErrorCode.timeout); + } + pub fn onConnectError(this: *PostgresConnection, _: Socket, _: c_int) void { + this.terminate(ErrorCode.failed_to_connect); + } + + pub fn onEnd(this: *PostgresConnection, socket: Socket) void { + log("onEnd", .{}); + std.debug.assert(socket.socket == this.tcp.socket); + this.terminate(ErrorCode.ended); + } + + pub fn onData(_: *PostgresConnection, _: Socket, data: []const u8) void { + log("onData: {d}", .{data.len}); + } + + pub fn onClose(this: *PostgresConnection, _: Socket, _: c_int, _: ?*anyopaque) void { + log("onClose", .{}); + this.terminate(ErrorCode.closed); + } + + pub fn onOpen(this: *PostgresConnection, socket: Socket) void { + log("onOpen", .{}); + std.debug.assert(socket.socket == this.tcp.socket); + } + + pub fn terminate(this: *PostgresConnection, code: ErrorCode) void { + log("terminate - {s}", .{code.code()}); + this.poll_ref.disable(); + + if (this.tcp.isEstablished() and !this.tcp.isClosed()) { + this.tcp.ext(?*anyopaque).?.* = null; + this.tcp.close(0, null); + } + + this.database().terminate(code); + } +}; + +const PendingQuery = struct { + resolve: JSC.JSValue, + reject: JSC.JSValue, + query: JSC.ZigString.Slice, +}; + +pub const PostgresSQLDatabase = struct { + const log = Output.scoped(.PostgresSQLDatabase, false); + pub usingnamespace JSC.Codegen.JSPostgresSQLDatabase; + arena: std.heap.ArenaAllocator, + connection: PostgresConnection, + options: ConnectionOptions, + this_jsvalue: JSC.JSValue = .zero, + globalObject: *JSC.JSGlobalObject, + status: Status = .connecting, + has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + + close_status: ErrorCode = .unknown, + + pending_queries: std.ArrayListUnmanaged(PendingQuery) = .{}, + + pub const Status = enum { + connecting, + connected, + closing, + closed, + + pub const label = bun.enumMap(Status, .{ + .{ .connecting, "connecting" }, + .{ .connected, "connected" }, + .{ .closing, "closing" }, + .{ .closed, "closed" }, + }); + }; + + pub fn hasPendingActivity(this: *PostgresSQLDatabase) callconv(.C) bool { + @fence(.Acquire); + return this.has_pending_activity.load(.Acquire); + } + + pub fn getStatus(this: *PostgresSQLDatabase, globalThis: *JSC.JSGlobalObject) callconv(.C) JSC.JSValue { + return JSC.ZigString.init(this.status.label()).toValueGC(globalThis); + } + + fn setStatus( + this: *PostgresSQLDatabase, + status: Status, + _: JSC.JSValue, + ) void { + this.status = status; + this.updateHasPendingData(); + if (status == .connected) {} + } + + pub fn updateHasPendingData(this: *PostgresSQLDatabase) void { + @fence(.Release); + this.has_pending_activity.store(this.status != .closed, .Release); + } + + pub fn terminate(this: *PostgresSQLDatabase, code: ErrorCode) void { + const js_value = this.this_jsvalue; + if (this.status == .connecting) { + this.setStatus(.closed, js_value); + return; + } + this.close_status = code; + + if (this.status == .closed) + return; + + this.setStatus(.closed, js_value); + } + + pub fn connect(globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + const arguments_ = callframe.arguments(8); + const arguments: []const JSC.JSValue = arguments_.ptr[0..arguments_.len]; + + if (arguments.len < 1) { + globalObject.throwNotEnoughArguments("connect", 1, 0); + return .zero; + } + + if (arguments[0].isEmptyOrUndefinedOrNull()) { + globalObject.throwInvalidArgumentType("connect", "options", "url string or object"); + return .zero; + } + + var arena = std.heap.ArenaAllocator.init(globalObject.allocator()); + + var options = ConnectionOptions{ .tcp = .{} }; + + if (arguments[0].get(globalObject, "host")) |value| { + if (!value.isEmptyOrUndefinedOrNull()) { + const str = value.toSlice(globalObject, arena.allocator()).clone(arena.allocator()) catch @panic("Out of memory"); + if (str.len > 0) + options.tcp.hostname = str.slice(); + } + } + if (arguments[0].get(globalObject, "port")) |value| { + if (!value.isEmptyOrUndefinedOrNull()) { + const str = value.toSlice(globalObject, arena.allocator()).clone(arena.allocator()) catch @panic("Out of memory"); + if (str.len > 0) + options.tcp.port = std.fmt.parseInt(u16, str.slice(), 10) catch @panic("Error parsing port number"); + } + } + if (arguments[0].get(globalObject, "database")) |value| { + if (!value.isEmptyOrUndefinedOrNull()) { + const str = value.toSlice(globalObject, arena.allocator()).clone(arena.allocator()) catch @panic("Out of memory"); + if (str.len > 0) + options.tcp.database = str.slice(); + } + } + if (arguments[0].get(globalObject, "user")) |value| { + if (!value.isEmptyOrUndefinedOrNull()) { + const str = value.toSlice(globalObject, arena.allocator()).clone(arena.allocator()) catch @panic("Out of memory"); + if (str.len > 0) + options.tcp.user = str.slice(); + } + } + if (arguments[0].get(globalObject, "pass")) |value| { + if (!value.isEmptyOrUndefinedOrNull()) { + const str = value.toSlice(globalObject, arena.allocator()).clone(arena.allocator()) catch @panic("Out of memory"); + if (str.len > 0) + options.tcp.password = str.slice(); + } + } + // if (arguments[0].get(globalObject, "path")) |value| { + // if (!value.isEmptyOrUndefinedOrNull()) { + // const str = value.toSlice(globalObject).clone(arena.allocator()); + // if (str.len > 0) + // options.tcp.p = str.slice(); + // } + // } + var db = globalObject.allocator().create(PostgresSQLDatabase) catch |err| { + arena.deinit(); + globalObject.throwError(err, "failed to allocate db"); + return .zero; + }; + + const this = db.toJS(globalObject); + db.* = .{ + .this_jsvalue = this, + .options = options, + .status = .connecting, + .arena = arena, + .globalObject = globalObject, + .connection = undefined, + }; + PostgresSQLDatabase.onCloseSetCached(this, globalObject, arguments[0].get(globalObject, "onClose") orelse @panic("Expected onClose. Don't call this function outside of bun:sql.")); + PostgresSQLDatabase.onNoticeSetCached(this, globalObject, arguments[0].get(globalObject, "onNotice") orelse @panic("Expected onNotice. Don't call this function outside of bun:sql.")); + PostgresSQLDatabase.onOpenSetCached(this, globalObject, arguments[0].get(globalObject, "onOpen") orelse @panic("Expected onOpen. Don't call this function outside of bun:sql.")); + PostgresSQLDatabase.onTimeoutSetCached(this, globalObject, arguments[0].get(globalObject, "onTimeout") orelse @panic("Expected onTimeout. Don't call this function outside of bun:sql.")); + PostgresSQLDatabase.onDrainSetCached(this, globalObject, arguments[0].get(globalObject, "onDrain") orelse @panic("Expected onDrain. Don't call this function outside of bun:sql.")); + db.updateHasPendingData(); + PostgresConnection.connect(globalObject, db, options) catch |err| { + arena.deinit(); + globalObject.throwError(err, "failed to connect"); + return .zero; + }; + + return this; + } + + pub fn query(_: *PostgresSQLDatabase, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue { + return JSC.JSValue.jsUndefined(); + } + + pub fn ref(_: *PostgresSQLDatabase, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue { + return JSC.JSValue.jsUndefined(); + } + + pub fn unref(_: *PostgresSQLDatabase, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue { + return JSC.JSValue.jsUndefined(); + } + + pub fn close(this: *PostgresSQLDatabase, globalObject: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue { + if (this.status == .closed) { + return JSC.ZigString.init(this.close_status.label()).toValueGC(globalObject); + } + + if (this.status == .closing) { + return JSC.JSValue.jsUndefined(); + } + + std.debug.assert(!this.connection.tcp.isClosed()); + std.debug.assert(this.connection.tcp.isEstablished()); + std.debug.assert(!this.connection.tcp.isShutdown()); + + this.setStatus(.closing, this.this_jsvalue); + this.connection.closeGracefully(); + return JSC.JSValue.jsUndefined(); + } + + pub fn finalize(this: *PostgresSQLDatabase) callconv(.C) void { + this.deinit(); + } + + pub fn deinit(this: *PostgresSQLDatabase) void { + std.debug.assert(this.status == .closed); + this.arena.deinit(); + bun.default_allocator.destroy(this); + } +}; + +comptime { + @export(PostgresSQLDatabase.connect, .{ + .name = "Bun__PostgreSQL__connect", + }); +} |