diff options
Diffstat (limited to 'src/js/node/net.js')
-rw-r--r-- | src/js/node/net.js | 796 |
1 files changed, 796 insertions, 0 deletions
diff --git a/src/js/node/net.js b/src/js/node/net.js new file mode 100644 index 000000000..e767d0096 --- /dev/null +++ b/src/js/node/net.js @@ -0,0 +1,796 @@ +// Hardcoded module "node:net" +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +// IPv4 Segment +const v4Seg = "(?:[0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])"; +const v4Str = `(${v4Seg}[.]){3}${v4Seg}`; +const IPv4Reg = new RegExp(`^${v4Str}$`); + +// IPv6 Segment +const v6Seg = "(?:[0-9a-fA-F]{1,4})"; +const IPv6Reg = new RegExp( + "^(" + + `(?:${v6Seg}:){7}(?:${v6Seg}|:)|` + + `(?:${v6Seg}:){6}(?:${v4Str}|:${v6Seg}|:)|` + + `(?:${v6Seg}:){5}(?::${v4Str}|(:${v6Seg}){1,2}|:)|` + + `(?:${v6Seg}:){4}(?:(:${v6Seg}){0,1}:${v4Str}|(:${v6Seg}){1,3}|:)|` + + `(?:${v6Seg}:){3}(?:(:${v6Seg}){0,2}:${v4Str}|(:${v6Seg}){1,4}|:)|` + + `(?:${v6Seg}:){2}(?:(:${v6Seg}){0,3}:${v4Str}|(:${v6Seg}){1,5}|:)|` + + `(?:${v6Seg}:){1}(?:(:${v6Seg}){0,4}:${v4Str}|(:${v6Seg}){1,6}|:)|` + + `(?::((?::${v6Seg}){0,5}:${v4Str}|(?::${v6Seg}){1,7}|:))` + + ")(%[0-9a-zA-Z-.:]{1,})?$", +); + +function isIPv4(s) { + return IPv4Reg.test(s); +} + +function isIPv6(s) { + return IPv6Reg.test(s); +} + +function isIP(s) { + if (isIPv4(s)) return 4; + if (isIPv6(s)) return 6; + return 0; +} + +const { Bun, createFIFO, Object } = import.meta.primordials; +const { connect: bunConnect } = Bun; +const { Duplex } = import.meta.require("node:stream"); +const { EventEmitter } = import.meta.require("node:events"); +var { setTimeout } = globalThis; + +const bunTlsSymbol = Symbol.for("::buntls::"); +const bunSocketServerHandlers = Symbol.for("::bunsocket_serverhandlers::"); +const bunSocketServerConnections = Symbol.for("::bunnetserverconnections::"); +const bunSocketServerOptions = Symbol.for("::bunnetserveroptions::"); + +var SocketClass; +const Socket = (function (InternalSocket) { + SocketClass = InternalSocket; + Object.defineProperty(SocketClass.prototype, Symbol.toStringTag, { + value: "Socket", + enumerable: false, + }); + + return Object.defineProperty( + function Socket(options) { + return new InternalSocket(options); + }, + Symbol.hasInstance, + { + value(instance) { + return instance instanceof InternalSocket; + }, + }, + ); +})( + class Socket extends Duplex { + static #Handlers = { + close: Socket.#Close, + connectError(socket, error) { + const self = socket.data; + + self.emit("error", error); + }, + data({ data: self }, buffer) { + self.bytesRead += buffer.length; + const queue = self.#readQueue; + + if (queue.isEmpty()) { + if (self.push(buffer)) return; + } + queue.push(buffer); + }, + drain: Socket.#Drain, + end: Socket.#Close, + error(socket, error) { + const self = socket.data; + const callback = self.#writeCallback; + if (callback) { + self.#writeCallback = null; + callback(error); + } + self.emit("error", error); + }, + open(socket) { + const self = socket.data; + socket.timeout(self.timeout); + socket.ref(); + self.#socket = socket; + self.connecting = false; + self.emit("connect", self); + Socket.#Drain(socket); + }, + handshake(socket, success, verifyError) { + const { data: self } = socket; + self._securePending = false; + self.secureConnecting = false; + self._secureEstablished = !!success; + + // Needs getPeerCertificate support (not implemented yet) + // if (!verifyError && !this.isSessionReused()) { + // const hostname = options.servername || + // options.host || + // (options.socket && options.socket._host) || + // 'localhost'; + // const cert = this.getPeerCertificate(true); + // verifyError = options.checkServerIdentity(hostname, cert); + // } + + if (self._requestCert || self._rejectUnauthorized) { + if (verifyError) { + self.authorized = false; + self.authorizationError = verifyError.code || verifyError.message; + if (self._rejectUnauthorized) { + self.destroy(verifyError); + return; + } + } + } else { + self.authorized = true; + } + self.emit("secureConnect", verifyError); + }, + timeout(socket) { + const self = socket.data; + self.emit("timeout", self); + }, + binaryType: "buffer", + }; + + static #Close(socket) { + const self = socket.data; + if (self.#closed) return; + self.#closed = true; + //socket cannot be used after close + self.#socket = null; + const queue = self.#readQueue; + if (queue.isEmpty()) { + if (self.push(null)) return; + } + queue.push(null); + } + + static #Drain(socket) { + const self = socket.data; + + const callback = self.#writeCallback; + if (callback) { + const chunk = self.#writeChunk; + const written = socket.write(chunk); + + self.bytesWritten += written; + if (written < chunk.length) { + self.#writeChunk = chunk.slice(written); + } else { + self.#writeCallback = null; + self.#writeChunk = null; + callback(null); + } + } + } + + static [bunSocketServerHandlers] = { + data: Socket.#Handlers.data, + close(socket) { + Socket.#Handlers.close(socket); + this.data[bunSocketServerConnections]--; + }, + end(socket) { + Socket.#Handlers.end(socket); + this.data[bunSocketServerConnections]--; + }, + open(socket) { + const self = this.data; + const options = self[bunSocketServerOptions]; + const { pauseOnConnect, connectionListener, InternalSocketClass, requestCert, rejectUnauthorized } = options; + const _socket = new InternalSocketClass({}); + _socket.isServer = true; + _socket._requestCert = requestCert; + _socket._rejectUnauthorized = rejectUnauthorized; + + _socket.#attach(this.localPort, socket); + if (self.maxConnections && self[bunSocketServerConnections] >= self.maxConnections) { + const data = { + localAddress: _socket.localAddress, + localPort: _socket.localPort, + localFamily: _socket.localFamily, + remoteAddress: _socket.remoteAddress, + remotePort: _socket.remotePort, + remoteFamily: _socket.remoteFamily || "IPv4", + }; + + socket.end(); + + self.emit("drop", data); + return; + } + // the duplex implementation start paused, so we resume when pauseOnConnect is falsy + if (!pauseOnConnect) { + _socket.resume(); + } + + self[bunSocketServerConnections]++; + + if (typeof connectionListener == "function") { + if (InternalSocketClass.name === "TLSSocket") { + // add secureConnection event handler + self.once("secureConnection", () => connectionListener(_socket)); + } else { + connectionListener(_socket); + } + } + + self.emit("connection", _socket); + }, + handshake({ data: self }, success, verifyError) { + self._securePending = false; + self.secureConnecting = false; + self._secureEstablished = !!success; + // Needs getPeerCertificate support (not implemented yet) + // if (!verifyError && !this.isSessionReused()) { + // const hostname = options.servername || + // options.host || + // (options.socket && options.socket._host) || + // 'localhost'; + // const cert = this.getPeerCertificate(true); + // verifyError = options.checkServerIdentity(hostname, cert); + // } + + if (self._requestCert || self._rejectUnauthorized) { + if (verifyError) { + self.authorized = false; + self.authorizationError = verifyError.code || verifyError.message; + if (self._rejectUnauthorized) { + self.destroy(verifyError); + return; + } + } + } else { + self.authorized = true; + } + self.emit("secureConnect", verifyError); + }, + error(socket, error) { + Socket.#Handlers.error(socket, error); + this.data.emit("error", error); + }, + timeout: Socket.#Handlers.timeout, + connectError: Socket.#Handlers.connectError, + drain: Socket.#Handlers.drain, + binaryType: "buffer", + }; + + bytesRead = 0; + bytesWritten = 0; + #closed = false; + connecting = false; + localAddress = "127.0.0.1"; + #readQueue = createFIFO(); + remotePort; + #socket; + timeout = 0; + #writeCallback; + #writeChunk; + #pendingRead; + + isServer = false; + + constructor(options) { + const { signal, write, read, allowHalfOpen = false, ...opts } = options || {}; + super({ + ...opts, + allowHalfOpen, + readable: true, + writable: true, + }); + this.#pendingRead = undefined; + signal?.once("abort", () => this.destroy()); + this.once("connect", () => this.emit("ready")); + } + + address() { + return { + address: this.localAddress, + family: this.localFamily, + port: this.localPort, + }; + } + + get bufferSize() { + return this.writableLength; + } + + #attach(port, socket) { + this.remotePort = port; + socket.data = this; + socket.timeout(this.timeout); + socket.ref(); + this.#socket = socket; + this.connecting = false; + this.emit("connect", this); + Socket.#Drain(socket); + } + + connect(port, host, connectListener) { + var path; + if (typeof port === "string") { + path = port; + port = undefined; + + if (typeof host === "function") { + connectListener = host; + host = undefined; + } + } else if (typeof host == "function") { + if (typeof port === "string") { + path = port; + port = undefined; + } + + connectListener = host; + host = undefined; + } + if (typeof port == "object") { + var { + port, + host, + path, + // TODOs + localAddress, + localPort, + family, + hints, + lookup, + noDelay, + keepAlive, + keepAliveInitialDelay, + requestCert, + rejectUnauthorized, + pauseOnConnect, + servername, + } = port; + this.servername = servername; + } + + if (!pauseOnConnect) { + this.resume(); + } + this.connecting = true; + this.remotePort = port; + + const bunTLS = this[bunTlsSymbol]; + var tls = undefined; + + if (typeof bunTLS === "function") { + tls = bunTLS.call(this, port, host, true); + // Client always request Cert + this._requestCert = true; + this._rejectUnauthorized = rejectUnauthorized; + + if (tls) { + // TLS can true/false or options + if (typeof tls !== "object") { + tls = { + rejectUnauthorized: rejectUnauthorized, + requestCert: true, + }; + } else { + tls.rejectUnauthorized = rejectUnauthorized; + tls.requestCert = true; + } + } + + this.authorized = false; + this.secureConnecting = true; + this._secureEstablished = false; + this._securePending = true; + if (connectListener) this.on("secureConnect", connectListener); + } else if (connectListener) this.on("connect", connectListener); + bunConnect( + path + ? { + data: this, + unix: path, + socket: Socket.#Handlers, + tls, + } + : { + data: this, + hostname: host || "localhost", + port: port, + socket: Socket.#Handlers, + tls, + }, + ); + return this; + } + + _destroy(err, callback) { + this.#socket?.end(); + callback(err); + } + + _final(callback) { + this.#socket?.end(); + callback(); + } + + get localAddress() { + return "127.0.0.1"; + } + + get localFamily() { + return "IPv4"; + } + + get localPort() { + return this.#socket?.localPort; + } + + get pending() { + return this.connecting; + } + + _read(size) { + const queue = this.#readQueue; + let chunk; + while ((chunk = queue.peek())) { + if (!this.push(chunk)) return; + queue.shift(); + } + } + + get readyState() { + if (this.connecting) return "opening"; + if (this.readable) { + return this.writable ? "open" : "readOnly"; + } else { + return this.writable ? "writeOnly" : "closed"; + } + } + + ref() { + this.#socket?.ref(); + } + + get remoteAddress() { + return this.#socket?.remoteAddress; + } + + get remoteFamily() { + return "IPv4"; + } + + resetAndDestroy() { + this.#socket?.end(); + } + + setKeepAlive(enable = false, initialDelay = 0) { + // TODO + return this; + } + + setNoDelay(noDelay = true) { + // TODO + return this; + } + + setTimeout(timeout, callback) { + this.#socket?.timeout(timeout); + this.timeout = timeout; + if (callback) this.once("timeout", callback); + return this; + } + + unref() { + this.#socket?.unref(); + } + + _write(chunk, encoding, callback) { + if (typeof chunk == "string" && encoding !== "utf8") chunk = Buffer.from(chunk, encoding); + var written = this.#socket?.write(chunk); + if (written == chunk.length) { + callback(); + } else if (this.#writeCallback) { + callback(new Error("overlapping _write()")); + } else { + if (written > 0) { + if (typeof chunk == "string") { + chunk = chunk.slice(written); + } else { + chunk = chunk.subarray(written); + } + } + + this.#writeCallback = callback; + this.#writeChunk = chunk; + } + } + }, +); + +function createConnection(port, host, connectListener) { + if (typeof port === "object") { + // port is option pass Socket options and let connect handle connection options + return new Socket(port).connect(port, host, connectListener); + } + // port is path or host, let connect handle this + return new Socket().connect(port, host, connectListener); +} + +const connect = createConnection; + +class Server extends EventEmitter { + #server; + #listening = false; + [bunSocketServerConnections] = 0; + [bunSocketServerOptions]; + maxConnections = 0; + + constructor(options, connectionListener) { + super(); + + if (typeof options === "function") { + connectionListener = options; + options = {}; + } else if (options == null || typeof options === "object") { + options = { ...options }; + } else { + throw new Error("bun-net-polyfill: invalid arguments"); + } + + const { maxConnections } = options; + this.maxConnections = Number.isSafeInteger(maxConnections) && maxConnections > 0 ? maxConnections : 0; + + options.connectionListener = connectionListener; + this[bunSocketServerOptions] = options; + } + + ref() { + this.#server?.ref(); + return this; + } + + unref() { + this.#server?.unref(); + return this; + } + + close(callback) { + if (this.#server) { + this.#server.stop(true); + this.#server = null; + this.#listening = false; + this[bunSocketServerConnections] = 0; + this.emit("close"); + if (typeof callback === "function") { + callback(); + } + + return this; + } + + if (typeof callback === "function") { + const error = new Error("Server is not running"); + error.code = "ERR_SERVER_NOT_RUNNING"; + callback(error); + } + return this; + } + + address() { + const server = this.#server; + if (server) { + const unix = server.unix; + if (unix) { + return unix; + } + + //TODO: fix adress when host is passed + let address = server.hostname; + const type = isIP(address); + const port = server.port; + if (typeof port === "number") { + return { + port, + address, + family: type ? `IPv${type}` : undefined, + }; + } + if (type) { + return { + address, + family: type ? `IPv${type}` : undefined, + }; + } + + return address; + } + return null; + } + + getConnections(callback) { + if (typeof callback === "function") { + //in Bun case we will never error on getConnections + //node only errors if in the middle of the couting the server got disconnected, what never happens in Bun + //if disconnected will only pass null as well and 0 connected + callback(null, this.#server ? this[bunSocketServerConnections] : 0); + } + return this; + } + + listen(port, hostname, onListen) { + let backlog; + let path; + let exclusive = false; + //port is actually path + if (typeof port === "string") { + if (Number.isSafeInteger(hostname)) { + if (hostname > 0) { + //hostname is backlog + backlog = hostname; + } + } else if (typeof hostname === "function") { + //hostname is callback + onListen = hostname; + } + + path = port; + hostname = undefined; + port = undefined; + } else { + if (typeof hostname === "function") { + onListen = hostname; + hostname = undefined; + } + + if (typeof port === "function") { + onListen = port; + port = 0; + } else if (typeof port === "object") { + const options = port; + options.signal?.addEventListener("abort", () => this.close()); + + hostname = options.host; + exclusive = options.exclusive === true; + const path = options.path; + port = options.port; + + if (!Number.isSafeInteger(port) || port < 0) { + if (path) { + hostname = path; + port = undefined; + } else { + let message = 'The argument \'options\' must have the property "port" or "path"'; + try { + message = `${message}. Received ${JSON.stringify(options)}`; + } catch {} + + const error = new TypeError(message); + error.code = "ERR_INVALID_ARG_VALUE"; + throw error; + } + } else if (!Number.isSafeInteger(port) || port < 0) { + port = 0; + } + + // port <number> + // host <string> + // path <string> Will be ignored if port is specified. See Identifying paths for IPC connections. + // backlog <number> Common parameter of server.listen() functions. + // exclusive <boolean> Default: false + // readableAll <boolean> For IPC servers makes the pipe readable for all users. Default: false. + // writableAll <boolean> For IPC servers makes the pipe writable for all users. Default: false. + // ipv6Only <boolean> For TCP servers, setting ipv6Only to true will disable dual-stack support, i.e., binding to host :: won't make 0.0.0.0 be bound. Default: false. + // signal <AbortSignal> An AbortSignal that may be used to close a listening server. + + if (typeof port.callback === "function") onListen = port?.callback; + } else if (!Number.isSafeInteger(port) || port < 0) { + port = 0; + } + hostname = hostname || "::"; + } + + try { + var tls = undefined; + var TLSSocketClass = undefined; + const bunTLS = this[bunTlsSymbol]; + if (typeof bunTLS === "function") { + [tls, TLSSocketClass] = bunTLS.call(this, port, hostname, false); + } + + this[bunSocketServerOptions].InternalSocketClass = TLSSocketClass || SocketClass; + + this.#server = Bun.listen( + path + ? { + exclusive, + unix: path, + tls, + socket: SocketClass[bunSocketServerHandlers], + } + : { + exclusive, + port, + hostname, + tls, + socket: SocketClass[bunSocketServerHandlers], + }, + ); + + //make this instance available on handlers + this.#server.data = this; + + this.#listening = true; + + // We must schedule the emitListeningNextTick() only after the next run of + // the event loop's IO queue. Otherwise, the server may not actually be listening + // when the 'listening' event is emitted. + // + // That leads to all sorts of confusion. + // + // process.nextTick() is not sufficient because it will run before the IO queue. + setTimeout(emitListeningNextTick, 1, this, onListen); + } catch (err) { + this.#listening = false; + setTimeout(emitErrorNextTick, 1, this, err); + } + return this; + } +} + +function emitErrorNextTick(self, error) { + self.emit("error", error); +} + +function emitListeningNextTick(self, onListen) { + if (typeof onListen === "function") { + try { + onListen(); + } catch (err) { + self.emit("error", err); + } + } + self.emit("listening"); +} + +function createServer(options, connectionListener) { + return new Server(options, connectionListener); +} + +export default { + createServer, + Server, + createConnection, + connect, + isIP, + isIPv4, + isIPv6, + Socket, + [Symbol.for("CommonJS")]: 0, + [Symbol.for("::bunternal::")]: SocketClass, +}; + +export { createServer, Server, createConnection, connect, isIP, isIPv4, isIPv6, Socket }; |