aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js
diff options
context:
space:
mode:
authorGravatar Ciro Spaciari <ciro.spaciari@gmail.com> 2023-03-13 20:42:35 -0300
committerGravatar GitHub <noreply@github.com> 2023-03-13 16:42:35 -0700
commitac9f8c0e93b6b91096a6dc8782f09a08c2e4f6c8 (patch)
tree9f710fcd00a4c2987212ab961bd57e67c0c60729 /src/bun.js
parent8d320d137393d88ada6961dc0781de9054a0c453 (diff)
downloadbun-ac9f8c0e93b6b91096a6dc8782f09a08c2e4f6c8.tar.gz
bun-ac9f8c0e93b6b91096a6dc8782f09a08c2e4f6c8.tar.zst
bun-ac9f8c0e93b6b91096a6dc8782f09a08c2e4f6c8.zip
feat(net.createServer) and adds socket.connect IPC support (#2337)
* added net.Server * fix fmt * fix cast and move test * fix node-net.tests.ts but breaks server data receive * fix end and close only emitting when data or pipe was added * fix socket starting paused * add echo test * fix fmt * fix fmt * on close if the socket is paused, keep paused until the user resumes it to match node behavior * resume on connect * fix getConnections, error on close, make _Handler private and create symbols for SocketServerHandlers * add path support (IPC) * fix unix domains support, add connect unix tests * fix unix socket tests * emit listening and listening error on next tick * fix connection flask test * try fix connect error on macos test * merge connection and drop tests * added exclusive option * fix socket.zig fmt * fix validation for options and add test for it * pass prettier fmt
Diffstat (limited to 'src/bun.js')
-rw-r--r--src/bun.js/api/bun/socket.zig22
-rw-r--r--src/bun.js/net.exports.js303
2 files changed, 316 insertions, 9 deletions
diff --git a/src/bun.js/api/bun/socket.zig b/src/bun.js/api/bun/socket.zig
index e18d7f9e5..df71bfb23 100644
--- a/src/bun.js/api/bun/socket.zig
+++ b/src/bun.js/api/bun/socket.zig
@@ -218,6 +218,7 @@ pub const SocketConfig = struct {
ssl: ?JSC.API.ServerConfig.SSLConfig = null,
handlers: Handlers,
default_data: JSC.JSValue = .zero,
+ exclusive: bool = false,
pub fn fromJS(
opts: JSC.JSValue,
@@ -226,6 +227,7 @@ pub const SocketConfig = struct {
) ?SocketConfig {
var hostname_or_unix: JSC.ZigString.Slice = JSC.ZigString.Slice.empty;
var port: ?u16 = null;
+ var exclusive = false;
var ssl: ?JSC.API.ServerConfig.SSLConfig = null;
var default_data = JSValue.zero;
@@ -265,6 +267,10 @@ pub const SocketConfig = struct {
}
}
+ if (opts.getTruthy(globalObject, "exclusive")) |_| {
+ exclusive = true;
+ }
+
if (opts.getTruthy(globalObject, "hostname") orelse opts.getTruthy(globalObject, "host")) |hostname| {
if (!hostname.isString()) {
exception.* = JSC.toInvalidArguments("Expected \"hostname\" to be a string", .{}, globalObject).asObjectRef();
@@ -324,6 +330,7 @@ pub const SocketConfig = struct {
.ssl = ssl,
.handlers = handlers,
.default_data = default_data,
+ .exclusive = exclusive,
};
}
};
@@ -432,11 +439,12 @@ pub const Listener = struct {
var port = socket_config.port;
var ssl = socket_config.ssl;
var handlers = socket_config.handlers;
+ const exclusive = socket_config.exclusive;
handlers.is_server = true;
const ssl_enabled = ssl != null;
- const socket_flags: i32 = 0;
+ const socket_flags: i32 = if (exclusive) 1 else 0;
const ctx_opts: uws.us_socket_context_options_t = brk: {
var sock_ctx: uws.us_socket_context_options_t = undefined;
@@ -513,7 +521,7 @@ pub const Listener = struct {
);
}
- const connection: Listener.UnixOrHost = if (port) |port_| .{
+ var connection: Listener.UnixOrHost = if (port) |port_| .{
.host = .{ .host = (hostname_or_unix.cloneIfNeeded(bun.default_allocator) catch unreachable).slice(), .port = port_ },
} else .{
.unix = (hostname_or_unix.cloneIfNeeded(bun.default_allocator) catch unreachable).slice(),
@@ -524,7 +532,8 @@ pub const Listener = struct {
.host => |c| {
var host = bun.default_allocator.dupeZ(u8, c.host) catch unreachable;
defer bun.default_allocator.free(host);
- break :brk uws.us_socket_context_listen(
+
+ const socket = uws.us_socket_context_listen(
@boolToInt(ssl_enabled),
socket_context,
normalizeHost(@as([:0]const u8, host)),
@@ -532,6 +541,11 @@ pub const Listener = struct {
socket_flags,
8,
);
+ // should return the assigned port
+ if (socket) |s| {
+ connection.host.port = @intCast(u16, s.getLocalPort(ssl_enabled));
+ }
+ break :brk socket;
},
.unix => |u| {
var host = bun.default_allocator.dupeZ(u8, u) catch unreachable;
@@ -804,8 +818,6 @@ pub const Listener = struct {
default_data.ensureStillAlive();
- // const socket_flags: i32 = 0;
-
var handlers_ptr = handlers.vm.allocator.create(Handlers) catch @panic("OOM");
handlers_ptr.* = handlers;
handlers_ptr.is_server = false;
diff --git a/src/bun.js/net.exports.js b/src/bun.js/net.exports.js
index 6ed03d2b7..14973040e 100644
--- a/src/bun.js/net.exports.js
+++ b/src/bun.js/net.exports.js
@@ -56,8 +56,13 @@ export function isIP(s) {
const { Bun, createFIFO, Object } = import.meta.primordials;
const { connect: bunConnect } = Bun;
const { Duplex } = import.meta.require("node:stream");
+const { EventEmitter } = import.meta.require("node:events");
const bunTlsSymbol = Symbol.for("::buntls::");
+const bunSocketServerHandlers = Symbol.for("::bunsocket_serverhandlers::");
+const bunSocketServerConnections = Symbol.for("::bunnetserverconnections::");
+const bunSocketServerOptions = Symbol.for("::bunnetserveroptions::");
+
var SocketClass;
export const Socket = (function (InternalSocket) {
SocketClass = InternalSocket;
@@ -116,7 +121,7 @@ export const Socket = (function (InternalSocket) {
self.emit("connect");
Socket.#Drain(socket);
},
- timeout() {
+ timeout(socket) {
const self = socket.data;
self.emit("timeout");
},
@@ -152,6 +157,57 @@ export const Socket = (function (InternalSocket) {
}
}
+ static [bunSocketServerHandlers] = {
+ data: Socket.#Handlers.data,
+ close(socket) {
+ Socket.#Handlers.close(socket);
+ this.data[bunSocketServerConnections]--;
+ },
+ end(socket) {
+ Socket.#Handlers.end(socket);
+ this.data[bunSocketServerConnections]--;
+ },
+ open(socket) {
+ const self = this.data;
+ const options = self[bunSocketServerOptions];
+ const { pauseOnConnect, connectionListener } = options;
+ const _socket = new Socket(options);
+ _socket.#attach(this.localPort, socket);
+ if (self.maxConnections && self[bunSocketServerConnections] >= self.maxConnections) {
+ const data = {
+ localAddress: _socket.localAddress,
+ localPort: _socket.localPort,
+ localFamily: _socket.localFamily,
+ remoteAddress: _socket.remoteAddress,
+ remotePort: _socket.remotePort,
+ remoteFamily: _socket.remoteFamily || "IPv4",
+ };
+ socket.end();
+
+ self.emit("drop", data);
+ return;
+ }
+ // the duplex implementation start paused, so we resume when pauseOnConnect is falsy
+ if (!pauseOnConnect) {
+ _socket.resume();
+ }
+
+ self[bunSocketServerConnections]++;
+ if (typeof connectionListener == "function") {
+ connectionListener(_socket);
+ }
+ self.emit("connection", _socket);
+ },
+ error(socket, error) {
+ Socket.#Handlers.error(socket, error);
+ this.data.emit("error", error);
+ },
+ timeout: Socket.#Handlers.timeout,
+ connectError: Socket.#Handlers.connectError,
+ drain: Socket.#Handlers.drain,
+ binaryType: "buffer",
+ };
+
bytesRead = 0;
bytesWritten = 0;
#closed = false;
@@ -190,12 +246,27 @@ export const Socket = (function (InternalSocket) {
return this.writableLength;
}
+ #attach(port, socket) {
+ this.remotePort = port;
+ socket.data = this;
+ socket.timeout(this.timeout);
+ socket.ref();
+ this.#socket = socket;
+ this.connecting = false;
+ this.emit("connect");
+ Socket.#Drain(socket);
+ }
+
connect(port, host, connectListener) {
- // TODO support IPC sockets
var path;
- if (arguments.length === 1 && typeof port === "string") {
+ if (typeof port === "string") {
path = port;
port = undefined;
+
+ if (typeof host === "function") {
+ connectListener = host;
+ host = undefined;
+ }
} else if (typeof host == "function") {
if (typeof port === "string") {
path = port;
@@ -228,6 +299,7 @@ export const Socket = (function (InternalSocket) {
if (typeof bunTLS === "function") {
tls = bunTLS.call(this, port, host);
}
+
bunConnect(
path
? {
@@ -296,7 +368,7 @@ export const Socket = (function (InternalSocket) {
}
get remoteAddress() {
- return this.#socket.remoteAddress;
+ return this.#socket?.remoteAddress;
}
get remoteFamily() {
@@ -368,7 +440,230 @@ export function createConnection(port, host, connectListener) {
export const connect = createConnection;
+class Server extends EventEmitter {
+ #server;
+ #listening = false;
+ [bunSocketServerConnections] = 0;
+ [bunSocketServerOptions];
+ maxConnections = 0;
+
+ constructor(options, connectionListener) {
+ super();
+
+ if (typeof options === "function") {
+ connectionListener = options;
+ options = {};
+ } else if (options == null || typeof options === "object") {
+ options = { ...options };
+ } else {
+ throw new Error("bun-net-polyfill: invalid arguments");
+ }
+
+ const { maxConnections } = options;
+ this.maxConnections = Number.isSafeInteger(maxConnections) && maxConnections > 0 ? maxConnections : 0;
+
+ options.connectionListener = connectionListener;
+ this[bunSocketServerOptions] = options;
+ }
+
+ ref() {
+ this.#server?.ref();
+ return this;
+ }
+
+ unref() {
+ this.#server?.unref();
+ return this;
+ }
+
+ close(callback) {
+ if (this.#server) {
+ this.#server.stop(true);
+ this.#server = null;
+ this.#listening = false;
+ this[bunSocketServerConnections] = 0;
+ this.emit("close");
+ if (typeof callback === "function") {
+ callback();
+ }
+
+ return this;
+ }
+
+ if (typeof callback === "function") {
+ const error = new Error("Server is not running");
+ error.code = "ERR_SERVER_NOT_RUNNING";
+ callback(error);
+ }
+ return this;
+ }
+
+ address() {
+ const server = this.#server;
+ if (server) {
+ const unix = server.unix;
+ if (unix) {
+ return unix;
+ }
+
+ //TODO: fix adress when host is passed
+ let address = server.hostname;
+ const type = isIP(address);
+ const port = server.port;
+ if (typeof port === "number") {
+ return {
+ port,
+ address,
+ family: type ? `IPv${type}` : undefined,
+ };
+ }
+ if (type) {
+ return {
+ address,
+ family: type ? `IPv${type}` : undefined,
+ };
+ }
+
+ return address;
+ }
+ return null;
+ }
+
+ getConnections(callback) {
+ if (typeof callback === "function") {
+ //in Bun case we will never error on getConnections
+ //node only errors if in the middle of the couting the server got disconnected, what never happens in Bun
+ //if disconnected will only pass null as well and 0 connected
+ callback(null, this.#server ? this[bunSocketServerConnections] : 0);
+ }
+ return this;
+ }
+
+ listen(port, hostname, onListen) {
+ let backlog;
+ let path;
+ let exclusive = false;
+ //port is actually path
+ if (typeof port === "string") {
+ if (Number.isSafeInteger(hostname)) {
+ if (hostname > 0) {
+ //hostname is backlog
+ backlog = hostname;
+ }
+ } else if (typeof hostname === "function") {
+ //hostname is callback
+ onListen = hostname;
+ }
+
+ path = port;
+ hostname = undefined;
+ port = undefined;
+ } else {
+ if (typeof hostname === "function") {
+ onListen = hostname;
+ hostname = undefined;
+ }
+
+ if (typeof port === "function") {
+ onListen = port;
+ port = 0;
+ } else if (typeof port === "object") {
+ const options = port;
+ options.signal?.addEventListener("abort", () => this.close());
+
+ hostname = options.host;
+ exclusive = options.exclusive === true;
+ const path = options.path;
+ port = options.port;
+
+ if (!Number.isSafeInteger(port) || port < 0) {
+ if (path) {
+ hostname = path;
+ port = undefined;
+ } else {
+ let message = 'The argument \'options\' must have the property "port" or "path"';
+ try {
+ message = `${message}. Received ${JSON.stringify(options)}`;
+ } catch {}
+
+ const error = new TypeError(message);
+ error.code = "ERR_INVALID_ARG_VALUE";
+ throw error;
+ }
+ } else if (!Number.isSafeInteger(port) || port < 0) {
+ port = 0;
+ }
+
+ // port <number>
+ // host <string>
+ // path <string> Will be ignored if port is specified. See Identifying paths for IPC connections.
+ // backlog <number> Common parameter of server.listen() functions.
+ // exclusive <boolean> Default: false
+ // readableAll <boolean> For IPC servers makes the pipe readable for all users. Default: false.
+ // writableAll <boolean> For IPC servers makes the pipe writable for all users. Default: false.
+ // ipv6Only <boolean> For TCP servers, setting ipv6Only to true will disable dual-stack support, i.e., binding to host :: won't make 0.0.0.0 be bound. Default: false.
+ // signal <AbortSignal> An AbortSignal that may be used to close a listening server.
+
+ if (typeof port.callback === "function") onListen = port?.callback;
+ } else if (!Number.isSafeInteger(port) || port < 0) {
+ port = 0;
+ }
+ hostname = hostname || "::";
+ }
+
+ try {
+ this.#server = Bun.listen(
+ path
+ ? {
+ exclusive,
+ unix: path,
+ tls: false,
+ socket: SocketClass[bunSocketServerHandlers],
+ }
+ : {
+ exclusive,
+ port,
+ hostname,
+ tls: false,
+ socket: SocketClass[bunSocketServerHandlers],
+ },
+ );
+
+ //make this instance available on handlers
+ this.#server.data = this;
+
+ this.#listening = true;
+ process.nextTick(emitListeningNextTick, this, onListen);
+ } catch (err) {
+ this.#listening = false;
+ process.nextTick(emitErrorNextTick, this, err);
+ }
+ return this;
+ }
+}
+
+function emitErrorNextTick(self, error) {
+ self.emit("error", error);
+}
+
+function emitListeningNextTick(self, onListen) {
+ if (typeof onListen === "function") {
+ try {
+ onListen();
+ } catch (err) {
+ self.emit("error", err);
+ }
+ }
+ self.emit("listening");
+}
+
+function createServer(options, connectionListener) {
+ return new Server(options, connectionListener);
+}
+
export default {
+ createServer,
+ Server,
createConnection,
connect,
isIP,