diff options
author | 2023-03-13 20:42:35 -0300 | |
---|---|---|
committer | 2023-03-13 16:42:35 -0700 | |
commit | ac9f8c0e93b6b91096a6dc8782f09a08c2e4f6c8 (patch) | |
tree | 9f710fcd00a4c2987212ab961bd57e67c0c60729 /src/bun.js | |
parent | 8d320d137393d88ada6961dc0781de9054a0c453 (diff) | |
download | bun-ac9f8c0e93b6b91096a6dc8782f09a08c2e4f6c8.tar.gz bun-ac9f8c0e93b6b91096a6dc8782f09a08c2e4f6c8.tar.zst bun-ac9f8c0e93b6b91096a6dc8782f09a08c2e4f6c8.zip |
feat(net.createServer) and adds socket.connect IPC support (#2337)
* added net.Server
* fix fmt
* fix cast and move test
* fix node-net.tests.ts but breaks server data receive
* fix end and close only emitting when data or pipe was added
* fix socket starting paused
* add echo test
* fix fmt
* fix fmt
* on close if the socket is paused, keep paused until the user resumes it to match node behavior
* resume on connect
* fix getConnections, error on close, make _Handler private and create symbols for SocketServerHandlers
* add path support (IPC)
* fix unix domains support, add connect unix tests
* fix unix socket tests
* emit listening and listening error on next tick
* fix connection flask test
* try fix connect error on macos test
* merge connection and drop tests
* added exclusive option
* fix socket.zig fmt
* fix validation for options and add test for it
* pass prettier fmt
Diffstat (limited to 'src/bun.js')
-rw-r--r-- | src/bun.js/api/bun/socket.zig | 22 | ||||
-rw-r--r-- | src/bun.js/net.exports.js | 303 |
2 files changed, 316 insertions, 9 deletions
diff --git a/src/bun.js/api/bun/socket.zig b/src/bun.js/api/bun/socket.zig index e18d7f9e5..df71bfb23 100644 --- a/src/bun.js/api/bun/socket.zig +++ b/src/bun.js/api/bun/socket.zig @@ -218,6 +218,7 @@ pub const SocketConfig = struct { ssl: ?JSC.API.ServerConfig.SSLConfig = null, handlers: Handlers, default_data: JSC.JSValue = .zero, + exclusive: bool = false, pub fn fromJS( opts: JSC.JSValue, @@ -226,6 +227,7 @@ pub const SocketConfig = struct { ) ?SocketConfig { var hostname_or_unix: JSC.ZigString.Slice = JSC.ZigString.Slice.empty; var port: ?u16 = null; + var exclusive = false; var ssl: ?JSC.API.ServerConfig.SSLConfig = null; var default_data = JSValue.zero; @@ -265,6 +267,10 @@ pub const SocketConfig = struct { } } + if (opts.getTruthy(globalObject, "exclusive")) |_| { + exclusive = true; + } + if (opts.getTruthy(globalObject, "hostname") orelse opts.getTruthy(globalObject, "host")) |hostname| { if (!hostname.isString()) { exception.* = JSC.toInvalidArguments("Expected \"hostname\" to be a string", .{}, globalObject).asObjectRef(); @@ -324,6 +330,7 @@ pub const SocketConfig = struct { .ssl = ssl, .handlers = handlers, .default_data = default_data, + .exclusive = exclusive, }; } }; @@ -432,11 +439,12 @@ pub const Listener = struct { var port = socket_config.port; var ssl = socket_config.ssl; var handlers = socket_config.handlers; + const exclusive = socket_config.exclusive; handlers.is_server = true; const ssl_enabled = ssl != null; - const socket_flags: i32 = 0; + const socket_flags: i32 = if (exclusive) 1 else 0; const ctx_opts: uws.us_socket_context_options_t = brk: { var sock_ctx: uws.us_socket_context_options_t = undefined; @@ -513,7 +521,7 @@ pub const Listener = struct { ); } - const connection: Listener.UnixOrHost = if (port) |port_| .{ + var connection: Listener.UnixOrHost = if (port) |port_| .{ .host = .{ .host = (hostname_or_unix.cloneIfNeeded(bun.default_allocator) catch unreachable).slice(), .port = port_ }, } else .{ .unix = (hostname_or_unix.cloneIfNeeded(bun.default_allocator) catch unreachable).slice(), @@ -524,7 +532,8 @@ pub const Listener = struct { .host => |c| { var host = bun.default_allocator.dupeZ(u8, c.host) catch unreachable; defer bun.default_allocator.free(host); - break :brk uws.us_socket_context_listen( + + const socket = uws.us_socket_context_listen( @boolToInt(ssl_enabled), socket_context, normalizeHost(@as([:0]const u8, host)), @@ -532,6 +541,11 @@ pub const Listener = struct { socket_flags, 8, ); + // should return the assigned port + if (socket) |s| { + connection.host.port = @intCast(u16, s.getLocalPort(ssl_enabled)); + } + break :brk socket; }, .unix => |u| { var host = bun.default_allocator.dupeZ(u8, u) catch unreachable; @@ -804,8 +818,6 @@ pub const Listener = struct { default_data.ensureStillAlive(); - // const socket_flags: i32 = 0; - var handlers_ptr = handlers.vm.allocator.create(Handlers) catch @panic("OOM"); handlers_ptr.* = handlers; handlers_ptr.is_server = false; diff --git a/src/bun.js/net.exports.js b/src/bun.js/net.exports.js index 6ed03d2b7..14973040e 100644 --- a/src/bun.js/net.exports.js +++ b/src/bun.js/net.exports.js @@ -56,8 +56,13 @@ export function isIP(s) { const { Bun, createFIFO, Object } = import.meta.primordials; const { connect: bunConnect } = Bun; const { Duplex } = import.meta.require("node:stream"); +const { EventEmitter } = import.meta.require("node:events"); const bunTlsSymbol = Symbol.for("::buntls::"); +const bunSocketServerHandlers = Symbol.for("::bunsocket_serverhandlers::"); +const bunSocketServerConnections = Symbol.for("::bunnetserverconnections::"); +const bunSocketServerOptions = Symbol.for("::bunnetserveroptions::"); + var SocketClass; export const Socket = (function (InternalSocket) { SocketClass = InternalSocket; @@ -116,7 +121,7 @@ export const Socket = (function (InternalSocket) { self.emit("connect"); Socket.#Drain(socket); }, - timeout() { + timeout(socket) { const self = socket.data; self.emit("timeout"); }, @@ -152,6 +157,57 @@ export const Socket = (function (InternalSocket) { } } + static [bunSocketServerHandlers] = { + data: Socket.#Handlers.data, + close(socket) { + Socket.#Handlers.close(socket); + this.data[bunSocketServerConnections]--; + }, + end(socket) { + Socket.#Handlers.end(socket); + this.data[bunSocketServerConnections]--; + }, + open(socket) { + const self = this.data; + const options = self[bunSocketServerOptions]; + const { pauseOnConnect, connectionListener } = options; + const _socket = new Socket(options); + _socket.#attach(this.localPort, socket); + if (self.maxConnections && self[bunSocketServerConnections] >= self.maxConnections) { + const data = { + localAddress: _socket.localAddress, + localPort: _socket.localPort, + localFamily: _socket.localFamily, + remoteAddress: _socket.remoteAddress, + remotePort: _socket.remotePort, + remoteFamily: _socket.remoteFamily || "IPv4", + }; + socket.end(); + + self.emit("drop", data); + return; + } + // the duplex implementation start paused, so we resume when pauseOnConnect is falsy + if (!pauseOnConnect) { + _socket.resume(); + } + + self[bunSocketServerConnections]++; + if (typeof connectionListener == "function") { + connectionListener(_socket); + } + self.emit("connection", _socket); + }, + error(socket, error) { + Socket.#Handlers.error(socket, error); + this.data.emit("error", error); + }, + timeout: Socket.#Handlers.timeout, + connectError: Socket.#Handlers.connectError, + drain: Socket.#Handlers.drain, + binaryType: "buffer", + }; + bytesRead = 0; bytesWritten = 0; #closed = false; @@ -190,12 +246,27 @@ export const Socket = (function (InternalSocket) { return this.writableLength; } + #attach(port, socket) { + this.remotePort = port; + socket.data = this; + socket.timeout(this.timeout); + socket.ref(); + this.#socket = socket; + this.connecting = false; + this.emit("connect"); + Socket.#Drain(socket); + } + connect(port, host, connectListener) { - // TODO support IPC sockets var path; - if (arguments.length === 1 && typeof port === "string") { + if (typeof port === "string") { path = port; port = undefined; + + if (typeof host === "function") { + connectListener = host; + host = undefined; + } } else if (typeof host == "function") { if (typeof port === "string") { path = port; @@ -228,6 +299,7 @@ export const Socket = (function (InternalSocket) { if (typeof bunTLS === "function") { tls = bunTLS.call(this, port, host); } + bunConnect( path ? { @@ -296,7 +368,7 @@ export const Socket = (function (InternalSocket) { } get remoteAddress() { - return this.#socket.remoteAddress; + return this.#socket?.remoteAddress; } get remoteFamily() { @@ -368,7 +440,230 @@ export function createConnection(port, host, connectListener) { export const connect = createConnection; +class Server extends EventEmitter { + #server; + #listening = false; + [bunSocketServerConnections] = 0; + [bunSocketServerOptions]; + maxConnections = 0; + + constructor(options, connectionListener) { + super(); + + if (typeof options === "function") { + connectionListener = options; + options = {}; + } else if (options == null || typeof options === "object") { + options = { ...options }; + } else { + throw new Error("bun-net-polyfill: invalid arguments"); + } + + const { maxConnections } = options; + this.maxConnections = Number.isSafeInteger(maxConnections) && maxConnections > 0 ? maxConnections : 0; + + options.connectionListener = connectionListener; + this[bunSocketServerOptions] = options; + } + + ref() { + this.#server?.ref(); + return this; + } + + unref() { + this.#server?.unref(); + return this; + } + + close(callback) { + if (this.#server) { + this.#server.stop(true); + this.#server = null; + this.#listening = false; + this[bunSocketServerConnections] = 0; + this.emit("close"); + if (typeof callback === "function") { + callback(); + } + + return this; + } + + if (typeof callback === "function") { + const error = new Error("Server is not running"); + error.code = "ERR_SERVER_NOT_RUNNING"; + callback(error); + } + return this; + } + + address() { + const server = this.#server; + if (server) { + const unix = server.unix; + if (unix) { + return unix; + } + + //TODO: fix adress when host is passed + let address = server.hostname; + const type = isIP(address); + const port = server.port; + if (typeof port === "number") { + return { + port, + address, + family: type ? `IPv${type}` : undefined, + }; + } + if (type) { + return { + address, + family: type ? `IPv${type}` : undefined, + }; + } + + return address; + } + return null; + } + + getConnections(callback) { + if (typeof callback === "function") { + //in Bun case we will never error on getConnections + //node only errors if in the middle of the couting the server got disconnected, what never happens in Bun + //if disconnected will only pass null as well and 0 connected + callback(null, this.#server ? this[bunSocketServerConnections] : 0); + } + return this; + } + + listen(port, hostname, onListen) { + let backlog; + let path; + let exclusive = false; + //port is actually path + if (typeof port === "string") { + if (Number.isSafeInteger(hostname)) { + if (hostname > 0) { + //hostname is backlog + backlog = hostname; + } + } else if (typeof hostname === "function") { + //hostname is callback + onListen = hostname; + } + + path = port; + hostname = undefined; + port = undefined; + } else { + if (typeof hostname === "function") { + onListen = hostname; + hostname = undefined; + } + + if (typeof port === "function") { + onListen = port; + port = 0; + } else if (typeof port === "object") { + const options = port; + options.signal?.addEventListener("abort", () => this.close()); + + hostname = options.host; + exclusive = options.exclusive === true; + const path = options.path; + port = options.port; + + if (!Number.isSafeInteger(port) || port < 0) { + if (path) { + hostname = path; + port = undefined; + } else { + let message = 'The argument \'options\' must have the property "port" or "path"'; + try { + message = `${message}. Received ${JSON.stringify(options)}`; + } catch {} + + const error = new TypeError(message); + error.code = "ERR_INVALID_ARG_VALUE"; + throw error; + } + } else if (!Number.isSafeInteger(port) || port < 0) { + port = 0; + } + + // port <number> + // host <string> + // path <string> Will be ignored if port is specified. See Identifying paths for IPC connections. + // backlog <number> Common parameter of server.listen() functions. + // exclusive <boolean> Default: false + // readableAll <boolean> For IPC servers makes the pipe readable for all users. Default: false. + // writableAll <boolean> For IPC servers makes the pipe writable for all users. Default: false. + // ipv6Only <boolean> For TCP servers, setting ipv6Only to true will disable dual-stack support, i.e., binding to host :: won't make 0.0.0.0 be bound. Default: false. + // signal <AbortSignal> An AbortSignal that may be used to close a listening server. + + if (typeof port.callback === "function") onListen = port?.callback; + } else if (!Number.isSafeInteger(port) || port < 0) { + port = 0; + } + hostname = hostname || "::"; + } + + try { + this.#server = Bun.listen( + path + ? { + exclusive, + unix: path, + tls: false, + socket: SocketClass[bunSocketServerHandlers], + } + : { + exclusive, + port, + hostname, + tls: false, + socket: SocketClass[bunSocketServerHandlers], + }, + ); + + //make this instance available on handlers + this.#server.data = this; + + this.#listening = true; + process.nextTick(emitListeningNextTick, this, onListen); + } catch (err) { + this.#listening = false; + process.nextTick(emitErrorNextTick, this, err); + } + return this; + } +} + +function emitErrorNextTick(self, error) { + self.emit("error", error); +} + +function emitListeningNextTick(self, onListen) { + if (typeof onListen === "function") { + try { + onListen(); + } catch (err) { + self.emit("error", err); + } + } + self.emit("listening"); +} + +function createServer(options, connectionListener) { + return new Server(options, connectionListener); +} + export default { + createServer, + Server, createConnection, connect, isIP, |