aboutsummaryrefslogtreecommitdiff
path: root/src/js/node/net.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/node/net.js')
-rw-r--r--src/js/node/net.js796
1 files changed, 796 insertions, 0 deletions
diff --git a/src/js/node/net.js b/src/js/node/net.js
new file mode 100644
index 000000000..e767d0096
--- /dev/null
+++ b/src/js/node/net.js
@@ -0,0 +1,796 @@
+// Hardcoded module "node:net"
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+// IPv4 Segment
+const v4Seg = "(?:[0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])";
+const v4Str = `(${v4Seg}[.]){3}${v4Seg}`;
+const IPv4Reg = new RegExp(`^${v4Str}$`);
+
+// IPv6 Segment
+const v6Seg = "(?:[0-9a-fA-F]{1,4})";
+const IPv6Reg = new RegExp(
+ "^(" +
+ `(?:${v6Seg}:){7}(?:${v6Seg}|:)|` +
+ `(?:${v6Seg}:){6}(?:${v4Str}|:${v6Seg}|:)|` +
+ `(?:${v6Seg}:){5}(?::${v4Str}|(:${v6Seg}){1,2}|:)|` +
+ `(?:${v6Seg}:){4}(?:(:${v6Seg}){0,1}:${v4Str}|(:${v6Seg}){1,3}|:)|` +
+ `(?:${v6Seg}:){3}(?:(:${v6Seg}){0,2}:${v4Str}|(:${v6Seg}){1,4}|:)|` +
+ `(?:${v6Seg}:){2}(?:(:${v6Seg}){0,3}:${v4Str}|(:${v6Seg}){1,5}|:)|` +
+ `(?:${v6Seg}:){1}(?:(:${v6Seg}){0,4}:${v4Str}|(:${v6Seg}){1,6}|:)|` +
+ `(?::((?::${v6Seg}){0,5}:${v4Str}|(?::${v6Seg}){1,7}|:))` +
+ ")(%[0-9a-zA-Z-.:]{1,})?$",
+);
+
+function isIPv4(s) {
+ return IPv4Reg.test(s);
+}
+
+function isIPv6(s) {
+ return IPv6Reg.test(s);
+}
+
+function isIP(s) {
+ if (isIPv4(s)) return 4;
+ if (isIPv6(s)) return 6;
+ return 0;
+}
+
+const { Bun, createFIFO, Object } = import.meta.primordials;
+const { connect: bunConnect } = Bun;
+const { Duplex } = import.meta.require("node:stream");
+const { EventEmitter } = import.meta.require("node:events");
+var { setTimeout } = globalThis;
+
+const bunTlsSymbol = Symbol.for("::buntls::");
+const bunSocketServerHandlers = Symbol.for("::bunsocket_serverhandlers::");
+const bunSocketServerConnections = Symbol.for("::bunnetserverconnections::");
+const bunSocketServerOptions = Symbol.for("::bunnetserveroptions::");
+
+var SocketClass;
+const Socket = (function (InternalSocket) {
+ SocketClass = InternalSocket;
+ Object.defineProperty(SocketClass.prototype, Symbol.toStringTag, {
+ value: "Socket",
+ enumerable: false,
+ });
+
+ return Object.defineProperty(
+ function Socket(options) {
+ return new InternalSocket(options);
+ },
+ Symbol.hasInstance,
+ {
+ value(instance) {
+ return instance instanceof InternalSocket;
+ },
+ },
+ );
+})(
+ class Socket extends Duplex {
+ static #Handlers = {
+ close: Socket.#Close,
+ connectError(socket, error) {
+ const self = socket.data;
+
+ self.emit("error", error);
+ },
+ data({ data: self }, buffer) {
+ self.bytesRead += buffer.length;
+ const queue = self.#readQueue;
+
+ if (queue.isEmpty()) {
+ if (self.push(buffer)) return;
+ }
+ queue.push(buffer);
+ },
+ drain: Socket.#Drain,
+ end: Socket.#Close,
+ error(socket, error) {
+ const self = socket.data;
+ const callback = self.#writeCallback;
+ if (callback) {
+ self.#writeCallback = null;
+ callback(error);
+ }
+ self.emit("error", error);
+ },
+ open(socket) {
+ const self = socket.data;
+ socket.timeout(self.timeout);
+ socket.ref();
+ self.#socket = socket;
+ self.connecting = false;
+ self.emit("connect", self);
+ Socket.#Drain(socket);
+ },
+ handshake(socket, success, verifyError) {
+ const { data: self } = socket;
+ self._securePending = false;
+ self.secureConnecting = false;
+ self._secureEstablished = !!success;
+
+ // Needs getPeerCertificate support (not implemented yet)
+ // if (!verifyError && !this.isSessionReused()) {
+ // const hostname = options.servername ||
+ // options.host ||
+ // (options.socket && options.socket._host) ||
+ // 'localhost';
+ // const cert = this.getPeerCertificate(true);
+ // verifyError = options.checkServerIdentity(hostname, cert);
+ // }
+
+ if (self._requestCert || self._rejectUnauthorized) {
+ if (verifyError) {
+ self.authorized = false;
+ self.authorizationError = verifyError.code || verifyError.message;
+ if (self._rejectUnauthorized) {
+ self.destroy(verifyError);
+ return;
+ }
+ }
+ } else {
+ self.authorized = true;
+ }
+ self.emit("secureConnect", verifyError);
+ },
+ timeout(socket) {
+ const self = socket.data;
+ self.emit("timeout", self);
+ },
+ binaryType: "buffer",
+ };
+
+ static #Close(socket) {
+ const self = socket.data;
+ if (self.#closed) return;
+ self.#closed = true;
+ //socket cannot be used after close
+ self.#socket = null;
+ const queue = self.#readQueue;
+ if (queue.isEmpty()) {
+ if (self.push(null)) return;
+ }
+ queue.push(null);
+ }
+
+ static #Drain(socket) {
+ const self = socket.data;
+
+ const callback = self.#writeCallback;
+ if (callback) {
+ const chunk = self.#writeChunk;
+ const written = socket.write(chunk);
+
+ self.bytesWritten += written;
+ if (written < chunk.length) {
+ self.#writeChunk = chunk.slice(written);
+ } else {
+ self.#writeCallback = null;
+ self.#writeChunk = null;
+ callback(null);
+ }
+ }
+ }
+
+ static [bunSocketServerHandlers] = {
+ data: Socket.#Handlers.data,
+ close(socket) {
+ Socket.#Handlers.close(socket);
+ this.data[bunSocketServerConnections]--;
+ },
+ end(socket) {
+ Socket.#Handlers.end(socket);
+ this.data[bunSocketServerConnections]--;
+ },
+ open(socket) {
+ const self = this.data;
+ const options = self[bunSocketServerOptions];
+ const { pauseOnConnect, connectionListener, InternalSocketClass, requestCert, rejectUnauthorized } = options;
+ const _socket = new InternalSocketClass({});
+ _socket.isServer = true;
+ _socket._requestCert = requestCert;
+ _socket._rejectUnauthorized = rejectUnauthorized;
+
+ _socket.#attach(this.localPort, socket);
+ if (self.maxConnections && self[bunSocketServerConnections] >= self.maxConnections) {
+ const data = {
+ localAddress: _socket.localAddress,
+ localPort: _socket.localPort,
+ localFamily: _socket.localFamily,
+ remoteAddress: _socket.remoteAddress,
+ remotePort: _socket.remotePort,
+ remoteFamily: _socket.remoteFamily || "IPv4",
+ };
+
+ socket.end();
+
+ self.emit("drop", data);
+ return;
+ }
+ // the duplex implementation start paused, so we resume when pauseOnConnect is falsy
+ if (!pauseOnConnect) {
+ _socket.resume();
+ }
+
+ self[bunSocketServerConnections]++;
+
+ if (typeof connectionListener == "function") {
+ if (InternalSocketClass.name === "TLSSocket") {
+ // add secureConnection event handler
+ self.once("secureConnection", () => connectionListener(_socket));
+ } else {
+ connectionListener(_socket);
+ }
+ }
+
+ self.emit("connection", _socket);
+ },
+ handshake({ data: self }, success, verifyError) {
+ self._securePending = false;
+ self.secureConnecting = false;
+ self._secureEstablished = !!success;
+ // Needs getPeerCertificate support (not implemented yet)
+ // if (!verifyError && !this.isSessionReused()) {
+ // const hostname = options.servername ||
+ // options.host ||
+ // (options.socket && options.socket._host) ||
+ // 'localhost';
+ // const cert = this.getPeerCertificate(true);
+ // verifyError = options.checkServerIdentity(hostname, cert);
+ // }
+
+ if (self._requestCert || self._rejectUnauthorized) {
+ if (verifyError) {
+ self.authorized = false;
+ self.authorizationError = verifyError.code || verifyError.message;
+ if (self._rejectUnauthorized) {
+ self.destroy(verifyError);
+ return;
+ }
+ }
+ } else {
+ self.authorized = true;
+ }
+ self.emit("secureConnect", verifyError);
+ },
+ error(socket, error) {
+ Socket.#Handlers.error(socket, error);
+ this.data.emit("error", error);
+ },
+ timeout: Socket.#Handlers.timeout,
+ connectError: Socket.#Handlers.connectError,
+ drain: Socket.#Handlers.drain,
+ binaryType: "buffer",
+ };
+
+ bytesRead = 0;
+ bytesWritten = 0;
+ #closed = false;
+ connecting = false;
+ localAddress = "127.0.0.1";
+ #readQueue = createFIFO();
+ remotePort;
+ #socket;
+ timeout = 0;
+ #writeCallback;
+ #writeChunk;
+ #pendingRead;
+
+ isServer = false;
+
+ constructor(options) {
+ const { signal, write, read, allowHalfOpen = false, ...opts } = options || {};
+ super({
+ ...opts,
+ allowHalfOpen,
+ readable: true,
+ writable: true,
+ });
+ this.#pendingRead = undefined;
+ signal?.once("abort", () => this.destroy());
+ this.once("connect", () => this.emit("ready"));
+ }
+
+ address() {
+ return {
+ address: this.localAddress,
+ family: this.localFamily,
+ port: this.localPort,
+ };
+ }
+
+ get bufferSize() {
+ return this.writableLength;
+ }
+
+ #attach(port, socket) {
+ this.remotePort = port;
+ socket.data = this;
+ socket.timeout(this.timeout);
+ socket.ref();
+ this.#socket = socket;
+ this.connecting = false;
+ this.emit("connect", this);
+ Socket.#Drain(socket);
+ }
+
+ connect(port, host, connectListener) {
+ var path;
+ if (typeof port === "string") {
+ path = port;
+ port = undefined;
+
+ if (typeof host === "function") {
+ connectListener = host;
+ host = undefined;
+ }
+ } else if (typeof host == "function") {
+ if (typeof port === "string") {
+ path = port;
+ port = undefined;
+ }
+
+ connectListener = host;
+ host = undefined;
+ }
+ if (typeof port == "object") {
+ var {
+ port,
+ host,
+ path,
+ // TODOs
+ localAddress,
+ localPort,
+ family,
+ hints,
+ lookup,
+ noDelay,
+ keepAlive,
+ keepAliveInitialDelay,
+ requestCert,
+ rejectUnauthorized,
+ pauseOnConnect,
+ servername,
+ } = port;
+ this.servername = servername;
+ }
+
+ if (!pauseOnConnect) {
+ this.resume();
+ }
+ this.connecting = true;
+ this.remotePort = port;
+
+ const bunTLS = this[bunTlsSymbol];
+ var tls = undefined;
+
+ if (typeof bunTLS === "function") {
+ tls = bunTLS.call(this, port, host, true);
+ // Client always request Cert
+ this._requestCert = true;
+ this._rejectUnauthorized = rejectUnauthorized;
+
+ if (tls) {
+ // TLS can true/false or options
+ if (typeof tls !== "object") {
+ tls = {
+ rejectUnauthorized: rejectUnauthorized,
+ requestCert: true,
+ };
+ } else {
+ tls.rejectUnauthorized = rejectUnauthorized;
+ tls.requestCert = true;
+ }
+ }
+
+ this.authorized = false;
+ this.secureConnecting = true;
+ this._secureEstablished = false;
+ this._securePending = true;
+ if (connectListener) this.on("secureConnect", connectListener);
+ } else if (connectListener) this.on("connect", connectListener);
+ bunConnect(
+ path
+ ? {
+ data: this,
+ unix: path,
+ socket: Socket.#Handlers,
+ tls,
+ }
+ : {
+ data: this,
+ hostname: host || "localhost",
+ port: port,
+ socket: Socket.#Handlers,
+ tls,
+ },
+ );
+ return this;
+ }
+
+ _destroy(err, callback) {
+ this.#socket?.end();
+ callback(err);
+ }
+
+ _final(callback) {
+ this.#socket?.end();
+ callback();
+ }
+
+ get localAddress() {
+ return "127.0.0.1";
+ }
+
+ get localFamily() {
+ return "IPv4";
+ }
+
+ get localPort() {
+ return this.#socket?.localPort;
+ }
+
+ get pending() {
+ return this.connecting;
+ }
+
+ _read(size) {
+ const queue = this.#readQueue;
+ let chunk;
+ while ((chunk = queue.peek())) {
+ if (!this.push(chunk)) return;
+ queue.shift();
+ }
+ }
+
+ get readyState() {
+ if (this.connecting) return "opening";
+ if (this.readable) {
+ return this.writable ? "open" : "readOnly";
+ } else {
+ return this.writable ? "writeOnly" : "closed";
+ }
+ }
+
+ ref() {
+ this.#socket?.ref();
+ }
+
+ get remoteAddress() {
+ return this.#socket?.remoteAddress;
+ }
+
+ get remoteFamily() {
+ return "IPv4";
+ }
+
+ resetAndDestroy() {
+ this.#socket?.end();
+ }
+
+ setKeepAlive(enable = false, initialDelay = 0) {
+ // TODO
+ return this;
+ }
+
+ setNoDelay(noDelay = true) {
+ // TODO
+ return this;
+ }
+
+ setTimeout(timeout, callback) {
+ this.#socket?.timeout(timeout);
+ this.timeout = timeout;
+ if (callback) this.once("timeout", callback);
+ return this;
+ }
+
+ unref() {
+ this.#socket?.unref();
+ }
+
+ _write(chunk, encoding, callback) {
+ if (typeof chunk == "string" && encoding !== "utf8") chunk = Buffer.from(chunk, encoding);
+ var written = this.#socket?.write(chunk);
+ if (written == chunk.length) {
+ callback();
+ } else if (this.#writeCallback) {
+ callback(new Error("overlapping _write()"));
+ } else {
+ if (written > 0) {
+ if (typeof chunk == "string") {
+ chunk = chunk.slice(written);
+ } else {
+ chunk = chunk.subarray(written);
+ }
+ }
+
+ this.#writeCallback = callback;
+ this.#writeChunk = chunk;
+ }
+ }
+ },
+);
+
+function createConnection(port, host, connectListener) {
+ if (typeof port === "object") {
+ // port is option pass Socket options and let connect handle connection options
+ return new Socket(port).connect(port, host, connectListener);
+ }
+ // port is path or host, let connect handle this
+ return new Socket().connect(port, host, connectListener);
+}
+
+const connect = createConnection;
+
+class Server extends EventEmitter {
+ #server;
+ #listening = false;
+ [bunSocketServerConnections] = 0;
+ [bunSocketServerOptions];
+ maxConnections = 0;
+
+ constructor(options, connectionListener) {
+ super();
+
+ if (typeof options === "function") {
+ connectionListener = options;
+ options = {};
+ } else if (options == null || typeof options === "object") {
+ options = { ...options };
+ } else {
+ throw new Error("bun-net-polyfill: invalid arguments");
+ }
+
+ const { maxConnections } = options;
+ this.maxConnections = Number.isSafeInteger(maxConnections) && maxConnections > 0 ? maxConnections : 0;
+
+ options.connectionListener = connectionListener;
+ this[bunSocketServerOptions] = options;
+ }
+
+ ref() {
+ this.#server?.ref();
+ return this;
+ }
+
+ unref() {
+ this.#server?.unref();
+ return this;
+ }
+
+ close(callback) {
+ if (this.#server) {
+ this.#server.stop(true);
+ this.#server = null;
+ this.#listening = false;
+ this[bunSocketServerConnections] = 0;
+ this.emit("close");
+ if (typeof callback === "function") {
+ callback();
+ }
+
+ return this;
+ }
+
+ if (typeof callback === "function") {
+ const error = new Error("Server is not running");
+ error.code = "ERR_SERVER_NOT_RUNNING";
+ callback(error);
+ }
+ return this;
+ }
+
+ address() {
+ const server = this.#server;
+ if (server) {
+ const unix = server.unix;
+ if (unix) {
+ return unix;
+ }
+
+ //TODO: fix adress when host is passed
+ let address = server.hostname;
+ const type = isIP(address);
+ const port = server.port;
+ if (typeof port === "number") {
+ return {
+ port,
+ address,
+ family: type ? `IPv${type}` : undefined,
+ };
+ }
+ if (type) {
+ return {
+ address,
+ family: type ? `IPv${type}` : undefined,
+ };
+ }
+
+ return address;
+ }
+ return null;
+ }
+
+ getConnections(callback) {
+ if (typeof callback === "function") {
+ //in Bun case we will never error on getConnections
+ //node only errors if in the middle of the couting the server got disconnected, what never happens in Bun
+ //if disconnected will only pass null as well and 0 connected
+ callback(null, this.#server ? this[bunSocketServerConnections] : 0);
+ }
+ return this;
+ }
+
+ listen(port, hostname, onListen) {
+ let backlog;
+ let path;
+ let exclusive = false;
+ //port is actually path
+ if (typeof port === "string") {
+ if (Number.isSafeInteger(hostname)) {
+ if (hostname > 0) {
+ //hostname is backlog
+ backlog = hostname;
+ }
+ } else if (typeof hostname === "function") {
+ //hostname is callback
+ onListen = hostname;
+ }
+
+ path = port;
+ hostname = undefined;
+ port = undefined;
+ } else {
+ if (typeof hostname === "function") {
+ onListen = hostname;
+ hostname = undefined;
+ }
+
+ if (typeof port === "function") {
+ onListen = port;
+ port = 0;
+ } else if (typeof port === "object") {
+ const options = port;
+ options.signal?.addEventListener("abort", () => this.close());
+
+ hostname = options.host;
+ exclusive = options.exclusive === true;
+ const path = options.path;
+ port = options.port;
+
+ if (!Number.isSafeInteger(port) || port < 0) {
+ if (path) {
+ hostname = path;
+ port = undefined;
+ } else {
+ let message = 'The argument \'options\' must have the property "port" or "path"';
+ try {
+ message = `${message}. Received ${JSON.stringify(options)}`;
+ } catch {}
+
+ const error = new TypeError(message);
+ error.code = "ERR_INVALID_ARG_VALUE";
+ throw error;
+ }
+ } else if (!Number.isSafeInteger(port) || port < 0) {
+ port = 0;
+ }
+
+ // port <number>
+ // host <string>
+ // path <string> Will be ignored if port is specified. See Identifying paths for IPC connections.
+ // backlog <number> Common parameter of server.listen() functions.
+ // exclusive <boolean> Default: false
+ // readableAll <boolean> For IPC servers makes the pipe readable for all users. Default: false.
+ // writableAll <boolean> For IPC servers makes the pipe writable for all users. Default: false.
+ // ipv6Only <boolean> For TCP servers, setting ipv6Only to true will disable dual-stack support, i.e., binding to host :: won't make 0.0.0.0 be bound. Default: false.
+ // signal <AbortSignal> An AbortSignal that may be used to close a listening server.
+
+ if (typeof port.callback === "function") onListen = port?.callback;
+ } else if (!Number.isSafeInteger(port) || port < 0) {
+ port = 0;
+ }
+ hostname = hostname || "::";
+ }
+
+ try {
+ var tls = undefined;
+ var TLSSocketClass = undefined;
+ const bunTLS = this[bunTlsSymbol];
+ if (typeof bunTLS === "function") {
+ [tls, TLSSocketClass] = bunTLS.call(this, port, hostname, false);
+ }
+
+ this[bunSocketServerOptions].InternalSocketClass = TLSSocketClass || SocketClass;
+
+ this.#server = Bun.listen(
+ path
+ ? {
+ exclusive,
+ unix: path,
+ tls,
+ socket: SocketClass[bunSocketServerHandlers],
+ }
+ : {
+ exclusive,
+ port,
+ hostname,
+ tls,
+ socket: SocketClass[bunSocketServerHandlers],
+ },
+ );
+
+ //make this instance available on handlers
+ this.#server.data = this;
+
+ this.#listening = true;
+
+ // We must schedule the emitListeningNextTick() only after the next run of
+ // the event loop's IO queue. Otherwise, the server may not actually be listening
+ // when the 'listening' event is emitted.
+ //
+ // That leads to all sorts of confusion.
+ //
+ // process.nextTick() is not sufficient because it will run before the IO queue.
+ setTimeout(emitListeningNextTick, 1, this, onListen);
+ } catch (err) {
+ this.#listening = false;
+ setTimeout(emitErrorNextTick, 1, this, err);
+ }
+ return this;
+ }
+}
+
+function emitErrorNextTick(self, error) {
+ self.emit("error", error);
+}
+
+function emitListeningNextTick(self, onListen) {
+ if (typeof onListen === "function") {
+ try {
+ onListen();
+ } catch (err) {
+ self.emit("error", err);
+ }
+ }
+ self.emit("listening");
+}
+
+function createServer(options, connectionListener) {
+ return new Server(options, connectionListener);
+}
+
+export default {
+ createServer,
+ Server,
+ createConnection,
+ connect,
+ isIP,
+ isIPv4,
+ isIPv6,
+ Socket,
+ [Symbol.for("CommonJS")]: 0,
+ [Symbol.for("::bunternal::")]: SocketClass,
+};
+
+export { createServer, Server, createConnection, connect, isIP, isIPv4, isIPv6, Socket };