aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/api/postgres.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/api/postgres.zig')
-rw-r--r--src/bun.js/api/postgres.zig425
1 files changed, 425 insertions, 0 deletions
diff --git a/src/bun.js/api/postgres.zig b/src/bun.js/api/postgres.zig
new file mode 100644
index 000000000..bd45ba7cb
--- /dev/null
+++ b/src/bun.js/api/postgres.zig
@@ -0,0 +1,425 @@
+const Bun = @This();
+const default_allocator = @import("bun").default_allocator;
+const bun = @import("bun");
+const Environment = bun.Environment;
+const NetworkThread = @import("bun").HTTP.NetworkThread;
+const Global = bun.Global;
+const strings = bun.strings;
+const string = bun.string;
+const Output = @import("bun").Output;
+const MutableString = @import("bun").MutableString;
+const std = @import("std");
+const Allocator = std.mem.Allocator;
+const JSC = @import("bun").JSC;
+const JSValue = JSC.JSValue;
+const JSGlobalObject = JSC.JSGlobalObject;
+
+const uws = bun.uws;
+const Socket = uws.NewSocketHandler(false);
+const SocketContext = uws.SocketContext;
+const Messages = @import("./postgres_messages.zig");
+
+const ErrorCode = enum(i32) {
+ cancel,
+ invalid_response,
+ timeout,
+ closed,
+ failed_to_write,
+ failed_to_connect,
+ failed_to_allocate_memory,
+ invalid_utf8,
+ ended,
+ unknown,
+
+ pub const status = bun.enumMap(ErrorCode, .{
+ .{ .cancel, "cancel" },
+ .{ .invalid_response, "invalidResponse" },
+ .{ .timeout, "timeout" },
+ .{ .closed, "closed" },
+ .{ .failed_to_write, "failedToWrite" },
+ .{ .failed_to_connect, "failedToConnect" },
+ .{ .failed_to_allocate_memory, "failedToAllocateMemory" },
+ .{ .invalid_utf8, "invalidUtf8" },
+ .{ .ended, "ended" },
+ .{ .unknown, "unknown" },
+ });
+
+ pub const code = bun.enumMap(ErrorCode, .{
+ .{ .cancel, "POSTGRES_ERROR_CANCEL" },
+ .{ .invalid_response, "POSTGRES_ERROR_INVALID_RESPONSE" },
+ .{ .timeout, "POSTGRES_ERROR_TIMEOUT" },
+ .{ .closed, "POSTGRES_ERROR_CLOSED" },
+ .{ .failed_to_write, "POSTGRES_ERROR_FAILED_TO_WRITE" },
+ .{ .failed_to_connect, "POSTGRES_ERROR_FAILED_TO_CONNECT" },
+ .{ .failed_to_allocate_memory, "POSTGRES_ERROR_FAILED_TO_ALLOCATE_MEMORY" },
+ .{ .invalid_utf8, "POSTGRES_ERROR_INVALID_UTF8" },
+ .{ .ended, "POSTGRES_ERROR_ENDED" },
+ .{ .unknown, "POSTGRES_ERROR_UNKNOWN" },
+ });
+
+ pub const label = bun.enumMap(ErrorCode, .{
+ .{ .cancel, "The connection was cancelled" },
+ .{ .invalid_response, "The connection has an invalid response" },
+ .{ .timeout, "The connection timed out" },
+ .{ .closed, "The connection was closed" },
+ .{ .failed_to_write, "The connection failed to write" },
+ .{ .failed_to_connect, "The connection failed to connect" },
+ .{ .failed_to_allocate_memory, "Failed to allocate memory" },
+ .{ .invalid_utf8, "Received invalid UTF-8" },
+ .{ .ended, "The connection was ended" },
+ .{ .unknown, "An unknown error occurred" },
+ });
+
+ pub fn toErrorInstance(
+ this: ErrorCode,
+ globalObject: *JSC.JSGlobalObject,
+ ) JSC.JSValue {
+ var instance = globalObject.createErrorInstance(
+ "{s}",
+ .{this.label()},
+ );
+ instance.put("code", JSC.ZigString.init(this.code()).toValueGC(globalObject));
+ instance.put("name", JSC.ZigString.static("PostgresError").toValueGC(globalObject));
+ return instance;
+ }
+};
+
+const ConnectionOptions = union(enum) {
+ pub const TCP = struct {
+ hostname: []const u8 = "localhost",
+ port: u16 = 5432,
+ database: []const u8 = "postgres",
+ user: []const u8 = "",
+ password: []const u8 = "",
+ };
+ tcp: TCP,
+ tls: struct {
+ tcp: TCP,
+ },
+};
+
+pub const PostgresData = struct {
+ tcp_ctx: ?*uws.SocketContext = null,
+};
+
+pub const Protocol = struct {};
+
+pub const PostgresConnection = struct {
+ const log = Output.scoped(.PostgresConnection, false);
+
+ tcp: Socket,
+ poll_ref: JSC.PollRef = .{},
+
+ pub fn connect(globalThis: *JSC.JSGlobalObject, db: *PostgresSQLDatabase, options: ConnectionOptions) !void {
+ autoRegister(globalThis);
+ log("connect {s}:{d}", .{ options.tcp.hostname, options.tcp.port });
+ const socket = Socket.connectAnon(
+ options.tcp.hostname,
+ options.tcp.port,
+ globalThis.bunVM().rareData().postgres_data.tcp_ctx.?,
+ &db.connection,
+ ) orelse {
+ return error.FailedToConnect;
+ };
+ db.connection.tcp = socket;
+ }
+
+ pub fn closeGracefully(this: *PostgresConnection) void {
+ this.tcp.close(0, null); // todo
+ }
+
+ pub fn autoRegister(global: *JSC.JSGlobalObject) void {
+ var vm = global.bunVM();
+
+ if (vm.rareData().postgres_data.tcp_ctx == null) {
+ var opts: uws.us_socket_context_options_t = undefined;
+ @memset(@ptrCast([*]u8, &opts), 0, @sizeOf(uws.us_socket_context_options_t));
+ var ctx = uws.us_create_socket_context(0, vm.uws_event_loop.?, @sizeOf(usize), opts).?;
+ vm.rareData().postgres_data.tcp_ctx = ctx;
+ Socket.configure(
+ ctx,
+ false,
+ PostgresConnection,
+ struct {
+ pub const onClose = PostgresConnection.onClose;
+ pub const onData = PostgresConnection.onData;
+ pub const onWritable = PostgresConnection.onWritable;
+ pub const onTimeout = PostgresConnection.onTimeout;
+ pub const onConnectError = PostgresConnection.onConnectError;
+ pub const onEnd = PostgresConnection.onEnd;
+ },
+ );
+ }
+ }
+
+ pub inline fn database(this: *PostgresConnection) *PostgresSQLDatabase {
+ return @fieldParentPtr(PostgresSQLDatabase, "connection", this);
+ }
+
+ pub fn onWritable(
+ this: *PostgresConnection,
+ socket: Socket,
+ ) void {
+ std.debug.assert(socket.socket == this.tcp.socket);
+ // if (this.to_send.len == 0)
+ // return;
+
+ // const wrote = socket.write(this.to_send, true);
+ // if (wrote < 0) {
+ // this.terminate(ErrorCode.failed_to_write);
+ // return;
+ // }
+ // this.to_send = this.to_send[@min(@intCast(usize, wrote), this.to_send.len)..];
+ }
+ pub fn onTimeout(
+ this: *PostgresConnection,
+ _: Socket,
+ ) void {
+ this.terminate(ErrorCode.timeout);
+ }
+ pub fn onConnectError(this: *PostgresConnection, _: Socket, _: c_int) void {
+ this.terminate(ErrorCode.failed_to_connect);
+ }
+
+ pub fn onEnd(this: *PostgresConnection, socket: Socket) void {
+ log("onEnd", .{});
+ std.debug.assert(socket.socket == this.tcp.socket);
+ this.terminate(ErrorCode.ended);
+ }
+
+ pub fn onData(_: *PostgresConnection, _: Socket, data: []const u8) void {
+ log("onData: {d}", .{data.len});
+ }
+
+ pub fn onClose(this: *PostgresConnection, _: Socket, _: c_int, _: ?*anyopaque) void {
+ log("onClose", .{});
+ this.terminate(ErrorCode.closed);
+ }
+
+ pub fn onOpen(this: *PostgresConnection, socket: Socket) void {
+ log("onOpen", .{});
+ std.debug.assert(socket.socket == this.tcp.socket);
+ }
+
+ pub fn terminate(this: *PostgresConnection, code: ErrorCode) void {
+ log("terminate - {s}", .{code.code()});
+ this.poll_ref.disable();
+
+ if (this.tcp.isEstablished() and !this.tcp.isClosed()) {
+ this.tcp.ext(?*anyopaque).?.* = null;
+ this.tcp.close(0, null);
+ }
+
+ this.database().terminate(code);
+ }
+};
+
+const PendingQuery = struct {
+ resolve: JSC.JSValue,
+ reject: JSC.JSValue,
+ query: JSC.ZigString.Slice,
+};
+
+pub const PostgresSQLDatabase = struct {
+ const log = Output.scoped(.PostgresSQLDatabase, false);
+ pub usingnamespace JSC.Codegen.JSPostgresSQLDatabase;
+ arena: std.heap.ArenaAllocator,
+ connection: PostgresConnection,
+ options: ConnectionOptions,
+ this_jsvalue: JSC.JSValue = .zero,
+ globalObject: *JSC.JSGlobalObject,
+ status: Status = .connecting,
+ has_pending_activity: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+
+ close_status: ErrorCode = .unknown,
+
+ pending_queries: std.ArrayListUnmanaged(PendingQuery) = .{},
+
+ pub const Status = enum {
+ connecting,
+ connected,
+ closing,
+ closed,
+
+ pub const label = bun.enumMap(Status, .{
+ .{ .connecting, "connecting" },
+ .{ .connected, "connected" },
+ .{ .closing, "closing" },
+ .{ .closed, "closed" },
+ });
+ };
+
+ pub fn hasPendingActivity(this: *PostgresSQLDatabase) callconv(.C) bool {
+ @fence(.Acquire);
+ return this.has_pending_activity.load(.Acquire);
+ }
+
+ pub fn getStatus(this: *PostgresSQLDatabase, globalThis: *JSC.JSGlobalObject) callconv(.C) JSC.JSValue {
+ return JSC.ZigString.init(this.status.label()).toValueGC(globalThis);
+ }
+
+ fn setStatus(
+ this: *PostgresSQLDatabase,
+ status: Status,
+ _: JSC.JSValue,
+ ) void {
+ this.status = status;
+ this.updateHasPendingData();
+ if (status == .connected) {}
+ }
+
+ pub fn updateHasPendingData(this: *PostgresSQLDatabase) void {
+ @fence(.Release);
+ this.has_pending_activity.store(this.status != .closed, .Release);
+ }
+
+ pub fn terminate(this: *PostgresSQLDatabase, code: ErrorCode) void {
+ const js_value = this.this_jsvalue;
+ if (this.status == .connecting) {
+ this.setStatus(.closed, js_value);
+ return;
+ }
+ this.close_status = code;
+
+ if (this.status == .closed)
+ return;
+
+ this.setStatus(.closed, js_value);
+ }
+
+ pub fn connect(globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ const arguments_ = callframe.arguments(8);
+ const arguments: []const JSC.JSValue = arguments_.ptr[0..arguments_.len];
+
+ if (arguments.len < 1) {
+ globalObject.throwNotEnoughArguments("connect", 1, 0);
+ return .zero;
+ }
+
+ if (arguments[0].isEmptyOrUndefinedOrNull()) {
+ globalObject.throwInvalidArgumentType("connect", "options", "url string or object");
+ return .zero;
+ }
+
+ var arena = std.heap.ArenaAllocator.init(globalObject.allocator());
+
+ var options = ConnectionOptions{ .tcp = .{} };
+
+ if (arguments[0].get(globalObject, "host")) |value| {
+ if (!value.isEmptyOrUndefinedOrNull()) {
+ const str = value.toSlice(globalObject, arena.allocator()).clone(arena.allocator()) catch @panic("Out of memory");
+ if (str.len > 0)
+ options.tcp.hostname = str.slice();
+ }
+ }
+ if (arguments[0].get(globalObject, "port")) |value| {
+ if (!value.isEmptyOrUndefinedOrNull()) {
+ const str = value.toSlice(globalObject, arena.allocator()).clone(arena.allocator()) catch @panic("Out of memory");
+ if (str.len > 0)
+ options.tcp.port = std.fmt.parseInt(u16, str.slice(), 10) catch @panic("Error parsing port number");
+ }
+ }
+ if (arguments[0].get(globalObject, "database")) |value| {
+ if (!value.isEmptyOrUndefinedOrNull()) {
+ const str = value.toSlice(globalObject, arena.allocator()).clone(arena.allocator()) catch @panic("Out of memory");
+ if (str.len > 0)
+ options.tcp.database = str.slice();
+ }
+ }
+ if (arguments[0].get(globalObject, "user")) |value| {
+ if (!value.isEmptyOrUndefinedOrNull()) {
+ const str = value.toSlice(globalObject, arena.allocator()).clone(arena.allocator()) catch @panic("Out of memory");
+ if (str.len > 0)
+ options.tcp.user = str.slice();
+ }
+ }
+ if (arguments[0].get(globalObject, "pass")) |value| {
+ if (!value.isEmptyOrUndefinedOrNull()) {
+ const str = value.toSlice(globalObject, arena.allocator()).clone(arena.allocator()) catch @panic("Out of memory");
+ if (str.len > 0)
+ options.tcp.password = str.slice();
+ }
+ }
+ // if (arguments[0].get(globalObject, "path")) |value| {
+ // if (!value.isEmptyOrUndefinedOrNull()) {
+ // const str = value.toSlice(globalObject).clone(arena.allocator());
+ // if (str.len > 0)
+ // options.tcp.p = str.slice();
+ // }
+ // }
+ var db = globalObject.allocator().create(PostgresSQLDatabase) catch |err| {
+ arena.deinit();
+ globalObject.throwError(err, "failed to allocate db");
+ return .zero;
+ };
+
+ const this = db.toJS(globalObject);
+ db.* = .{
+ .this_jsvalue = this,
+ .options = options,
+ .status = .connecting,
+ .arena = arena,
+ .globalObject = globalObject,
+ .connection = undefined,
+ };
+ PostgresSQLDatabase.onCloseSetCached(this, globalObject, arguments[0].get(globalObject, "onClose") orelse @panic("Expected onClose. Don't call this function outside of bun:sql."));
+ PostgresSQLDatabase.onNoticeSetCached(this, globalObject, arguments[0].get(globalObject, "onNotice") orelse @panic("Expected onNotice. Don't call this function outside of bun:sql."));
+ PostgresSQLDatabase.onOpenSetCached(this, globalObject, arguments[0].get(globalObject, "onOpen") orelse @panic("Expected onOpen. Don't call this function outside of bun:sql."));
+ PostgresSQLDatabase.onTimeoutSetCached(this, globalObject, arguments[0].get(globalObject, "onTimeout") orelse @panic("Expected onTimeout. Don't call this function outside of bun:sql."));
+ PostgresSQLDatabase.onDrainSetCached(this, globalObject, arguments[0].get(globalObject, "onDrain") orelse @panic("Expected onDrain. Don't call this function outside of bun:sql."));
+ db.updateHasPendingData();
+ PostgresConnection.connect(globalObject, db, options) catch |err| {
+ arena.deinit();
+ globalObject.throwError(err, "failed to connect");
+ return .zero;
+ };
+
+ return this;
+ }
+
+ pub fn query(_: *PostgresSQLDatabase, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ return JSC.JSValue.jsUndefined();
+ }
+
+ pub fn ref(_: *PostgresSQLDatabase, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ return JSC.JSValue.jsUndefined();
+ }
+
+ pub fn unref(_: *PostgresSQLDatabase, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ return JSC.JSValue.jsUndefined();
+ }
+
+ pub fn close(this: *PostgresSQLDatabase, globalObject: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ if (this.status == .closed) {
+ return JSC.ZigString.init(this.close_status.label()).toValueGC(globalObject);
+ }
+
+ if (this.status == .closing) {
+ return JSC.JSValue.jsUndefined();
+ }
+
+ std.debug.assert(!this.connection.tcp.isClosed());
+ std.debug.assert(this.connection.tcp.isEstablished());
+ std.debug.assert(!this.connection.tcp.isShutdown());
+
+ this.setStatus(.closing, this.this_jsvalue);
+ this.connection.closeGracefully();
+ return JSC.JSValue.jsUndefined();
+ }
+
+ pub fn finalize(this: *PostgresSQLDatabase) callconv(.C) void {
+ this.deinit();
+ }
+
+ pub fn deinit(this: *PostgresSQLDatabase) void {
+ std.debug.assert(this.status == .closed);
+ this.arena.deinit();
+ bun.default_allocator.destroy(this);
+ }
+};
+
+comptime {
+ @export(PostgresSQLDatabase.connect, .{
+ .name = "Bun__PostgreSQL__connect",
+ });
+}