diff options
Diffstat (limited to 'src/js/node/net.js')
-rw-r--r-- | src/js/node/net.js | 941 |
1 files changed, 461 insertions, 480 deletions
diff --git a/src/js/node/net.js b/src/js/node/net.js index 0513f44d3..c4efe2f62 100644 --- a/src/js/node/net.js +++ b/src/js/node/net.js @@ -21,6 +21,7 @@ // USE OR OTHER DEALINGS IN THE SOFTWARE. const { Duplex } = require("node:stream"); const EventEmitter = require("node:events"); +const { es5ClassCompat } = require("$shared"); // IPv4 Segment const v4Seg = "(?:[0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])"; @@ -66,394 +67,398 @@ const bunSocketServerOptions = Symbol.for("::bunnetserveroptions::"); const bunSocketInternal = Symbol.for("::bunnetsocketinternal::"); const bunTLSConnectOptions = Symbol.for("::buntlsconnectoptions::"); -var SocketClass; -const Socket = (function (InternalSocket) { - SocketClass = InternalSocket; - Object.defineProperty(SocketClass.prototype, Symbol.toStringTag, { - value: "Socket", - enumerable: false, - }); - - return Object.defineProperty( - function Socket(options) { - return new InternalSocket(options); - }, - Symbol.hasInstance, - { - value(instance) { - return instance instanceof InternalSocket; - }, - }, - ); -})( - class Socket extends Duplex { - static #Handlers = { - close: Socket.#Close, - connectError(socket, error) { - const self = socket.data; - self.emit("error", error); - }, - data({ data: self }, buffer) { - self.bytesRead += buffer.length; - const queue = self.#readQueue; - - if (queue.isEmpty()) { - if (self.push(buffer)) return; - } - queue.push(buffer); - }, - drain: Socket.#Drain, - end: Socket.#Close, - error(socket, error) { - const self = socket.data; - const callback = self.#writeCallback; - if (callback) { - self.#writeCallback = null; - callback(error); - } - self.emit("error", error); - }, - open(socket) { - const self = socket.data; - socket.timeout(self.timeout); - socket.ref(); - self[bunSocketInternal] = socket; - self.connecting = false; - const options = self[bunTLSConnectOptions]; - - if (options) { - const { session } = options; - if (session) { - self.setSession(session); - } - } - - if (!self.#upgraded) { - // this is not actually emitted on nodejs when socket used on the connection - // this is already emmited on non-TLS socket and on TLS socket is emmited secureConnect after handshake - self.emit("connect", self); - } - - Socket.#Drain(socket); - }, - handshake(socket, success, verifyError) { - const { data: self } = socket; - - self._securePending = false; - self.secureConnecting = false; - self._secureEstablished = !!success; - self.emit("secure", self); - - const { checkServerIdentity } = self[bunTLSConnectOptions]; - if (!verifyError && typeof checkServerIdentity === "function" && self.servername) { - const cert = self.getPeerCertificate(true); - verifyError = checkServerIdentity(self.servername, cert); - } - - if (self._requestCert || self._rejectUnauthorized) { - if (verifyError) { - self.authorized = false; - self.authorizationError = verifyError.code || verifyError.message; - if (self._rejectUnauthorized) { - self.destroy(verifyError); - return; - } - } - } else { - self.authorized = true; - } - self.emit("secureConnect", verifyError); - }, - timeout(socket) { - const self = socket.data; - self.emit("timeout", self); - }, - binaryType: "buffer", - }; - - static #Close(socket) { +class Socket extends Duplex { + static #Handlers = { + close: Socket.#Close, + connectError(socket, error) { const self = socket.data; - if (self.#closed) return; - self.#closed = true; - //socket cannot be used after close - self[bunSocketInternal] = null; + self.emit("error", error); + }, + data({ data: self }, buffer) { + self.bytesRead += buffer.length; const queue = self.#readQueue; + if (queue.isEmpty()) { - if (self.push(null)) return; + if (self.push(buffer)) return; } - queue.push(null); - } - - static #Drain(socket) { + queue.push(buffer); + }, + drain: Socket.#Drain, + end: Socket.#Close, + error(socket, error) { const self = socket.data; - const callback = self.#writeCallback; if (callback) { - const chunk = self.#writeChunk; - const written = socket.write(chunk); - - self.bytesWritten += written; - if (written < chunk.length) { - self.#writeChunk = chunk.slice(written); - } else { - self.#writeCallback = null; - self.#writeChunk = null; - callback(null); + self.#writeCallback = null; + callback(error); + } + self.emit("error", error); + }, + open(socket) { + const self = socket.data; + socket.timeout(self.timeout); + socket.ref(); + self[bunSocketInternal] = socket; + self.connecting = false; + const options = self[bunTLSConnectOptions]; + + if (options) { + const { session } = options; + if (session) { + self.setSession(session); } } - } - static [bunSocketServerHandlers] = { - data: Socket.#Handlers.data, - close(socket) { - Socket.#Handlers.close(socket); - this.data[bunSocketServerConnections]--; - }, - end(socket) { - Socket.#Handlers.end(socket); - this.data[bunSocketServerConnections]--; - }, - open(socket) { - const self = this.data; - const options = self[bunSocketServerOptions]; - const { pauseOnConnect, connectionListener, InternalSocketClass, requestCert, rejectUnauthorized } = options; - const _socket = new InternalSocketClass({}); - _socket.isServer = true; - _socket._requestCert = requestCert; - _socket._rejectUnauthorized = rejectUnauthorized; - - _socket.#attach(this.localPort, socket); - if (self.maxConnections && self[bunSocketServerConnections] >= self.maxConnections) { - const data = { - localAddress: _socket.localAddress, - localPort: _socket.localPort, - localFamily: _socket.localFamily, - remoteAddress: _socket.remoteAddress, - remotePort: _socket.remotePort, - remoteFamily: _socket.remoteFamily || "IPv4", - }; - - socket.end(); - - self.emit("drop", data); - return; - } - // the duplex implementation start paused, so we resume when pauseOnConnect is falsy - if (!pauseOnConnect) { - _socket.resume(); - } + if (!self.#upgraded) { + // this is not actually emitted on nodejs when socket used on the connection + // this is already emmited on non-TLS socket and on TLS socket is emmited secureConnect after handshake + self.emit("connect", self); + } - self[bunSocketServerConnections]++; + Socket.#Drain(socket); + }, + handshake(socket, success, verifyError) { + const { data: self } = socket; + + self._securePending = false; + self.secureConnecting = false; + self._secureEstablished = !!success; + self.emit("secure", self); + + const { checkServerIdentity } = self[bunTLSConnectOptions]; + if (!verifyError && typeof checkServerIdentity === "function" && self.servername) { + const cert = self.getPeerCertificate(true); + verifyError = checkServerIdentity(self.servername, cert); + } - if (typeof connectionListener == "function") { - if (InternalSocketClass.name === "TLSSocket") { - // add secureConnection event handler - self.once("secureConnection", () => connectionListener(_socket)); - } else { - connectionListener(_socket); + if (self._requestCert || self._rejectUnauthorized) { + if (verifyError) { + self.authorized = false; + self.authorizationError = verifyError.code || verifyError.message; + if (self._rejectUnauthorized) { + self.destroy(verifyError); + return; } } + } else { + self.authorized = true; + } + self.emit("secureConnect", verifyError); + }, + timeout(socket) { + const self = socket.data; + self.emit("timeout", self); + }, + binaryType: "buffer", + }; + + static #Close(socket) { + const self = socket.data; + if (self.#closed) return; + self.#closed = true; + //socket cannot be used after close + self[bunSocketInternal] = null; + const queue = self.#readQueue; + if (queue.isEmpty()) { + if (self.push(null)) return; + } + queue.push(null); + } - self.emit("connection", _socket); - }, - handshake(socket, success, verifyError) { - const { data: self } = socket; - self.emit("secure", self); - - self._securePending = false; - self.secureConnecting = false; - self._secureEstablished = !!success; - - if (self._requestCert || self._rejectUnauthorized) { - if (verifyError) { - self.authorized = false; - self.authorizationError = verifyError.code || verifyError.message; - if (self._rejectUnauthorized) { - self.destroy(verifyError); - return; - } - } - } else { - self.authorized = true; - } - self.emit("secureConnection", verifyError); - }, - error(socket, error) { - Socket.#Handlers.error(socket, error); - this.data.emit("error", error); - }, - timeout: Socket.#Handlers.timeout, - connectError: Socket.#Handlers.connectError, - drain: Socket.#Handlers.drain, - binaryType: "buffer", - }; + static #Drain(socket) { + const self = socket.data; - bytesRead = 0; - bytesWritten = 0; - #closed = false; - connecting = false; - localAddress = "127.0.0.1"; - #readQueue = $createFIFO(); - remotePort; - [bunSocketInternal] = null; - [bunTLSConnectOptions] = null; - timeout = 0; - #writeCallback; - #writeChunk; - #pendingRead; - - isServer = false; - _handle; - _parent; - _parentWrap; - #socket; - #upgraded; - - constructor(options) { - const { socket, signal, write, read, allowHalfOpen = false, ...opts } = options || {}; - super({ - ...opts, - allowHalfOpen, - readable: true, - writable: true, - }); - this._handle = this; - this._parent = this; - this._parentWrap = this; - this.#pendingRead = undefined; - this.#upgraded = false; - if (socket instanceof Socket) { - this.#socket = socket; + const callback = self.#writeCallback; + if (callback) { + const chunk = self.#writeChunk; + const written = socket.write(chunk); + + self.bytesWritten += written; + if (written < chunk.length) { + self.#writeChunk = chunk.slice(written); + } else { + self.#writeCallback = null; + self.#writeChunk = null; + callback(null); } - signal?.once("abort", () => this.destroy()); - this.once("connect", () => this.emit("ready")); } + } - address() { - return { - address: this.localAddress, - family: this.localFamily, - port: this.localPort, - }; - } + static [bunSocketServerHandlers] = { + data: Socket.#Handlers.data, + close(socket) { + Socket.#Handlers.close(socket); + this.data[bunSocketServerConnections]--; + }, + end(socket) { + Socket.#Handlers.end(socket); + this.data[bunSocketServerConnections]--; + }, + open(socket) { + const self = this.data; + const options = self[bunSocketServerOptions]; + const { pauseOnConnect, connectionListener, InternalSocketClass, requestCert, rejectUnauthorized } = options; + const _socket = new InternalSocketClass({}); + _socket.isServer = true; + _socket._requestCert = requestCert; + _socket._rejectUnauthorized = rejectUnauthorized; + + _socket.#attach(this.localPort, socket); + if (self.maxConnections && self[bunSocketServerConnections] >= self.maxConnections) { + const data = { + localAddress: _socket.localAddress, + localPort: _socket.localPort, + localFamily: _socket.localFamily, + remoteAddress: _socket.remoteAddress, + remotePort: _socket.remotePort, + remoteFamily: _socket.remoteFamily || "IPv4", + }; - get bufferSize() { - return this.writableLength; - } + socket.end(); - #attach(port, socket) { - this.remotePort = port; - socket.data = this; - socket.timeout(this.timeout); - socket.ref(); - this[bunSocketInternal] = socket; - this.connecting = false; - if (!this.#upgraded) { - // this is not actually emitted on nodejs when socket used on the connection - // this is already emmited on non-TLS socket and on TLS socket is emmited secureConnect after handshake - this.emit("connect", this); + self.emit("drop", data); + return; + } + // the duplex implementation start paused, so we resume when pauseOnConnect is falsy + if (!pauseOnConnect) { + _socket.resume(); } - Socket.#Drain(socket); - } - connect(port, host, connectListener) { - var path; - var connection = this.#socket; - var _checkServerIdentity = undefined; - if (typeof port === "string") { - path = port; - port = undefined; + self[bunSocketServerConnections]++; - if (typeof host === "function") { - connectListener = host; - host = undefined; + if (typeof connectionListener == "function") { + if (InternalSocketClass.name === "TLSSocket") { + // add secureConnection event handler + self.once("secureConnection", () => connectionListener(_socket)); + } else { + connectionListener(_socket); } - } else if (typeof host == "function") { - if (typeof port === "string") { - path = port; - port = undefined; + } + + self.emit("connection", _socket); + }, + handshake(socket, success, verifyError) { + const { data: self } = socket; + self.emit("secure", self); + + self._securePending = false; + self.secureConnecting = false; + self._secureEstablished = !!success; + + if (self._requestCert || self._rejectUnauthorized) { + if (verifyError) { + self.authorized = false; + self.authorizationError = verifyError.code || verifyError.message; + if (self._rejectUnauthorized) { + self.destroy(verifyError); + return; + } } + } else { + self.authorized = true; + } + self.emit("secureConnection", verifyError); + }, + error(socket, error) { + Socket.#Handlers.error(socket, error); + this.data.emit("error", error); + }, + timeout: Socket.#Handlers.timeout, + connectError: Socket.#Handlers.connectError, + drain: Socket.#Handlers.drain, + binaryType: "buffer", + }; + + bytesRead = 0; + bytesWritten = 0; + #closed = false; + connecting = false; + localAddress = "127.0.0.1"; + #readQueue = $createFIFO(); + remotePort; + [bunSocketInternal] = null; + [bunTLSConnectOptions] = null; + timeout = 0; + #writeCallback; + #writeChunk; + #pendingRead; + + isServer = false; + _handle; + _parent; + _parentWrap; + #socket; + #upgraded; + + constructor(options) { + const { socket, signal, write, read, allowHalfOpen = false, ...opts } = options || {}; + super({ + ...opts, + allowHalfOpen, + readable: true, + writable: true, + }); + this._handle = this; + this._parent = this; + this._parentWrap = this; + this.#pendingRead = undefined; + this.#upgraded = false; + if (socket instanceof Socket) { + this.#socket = socket; + } + signal?.once("abort", () => this.destroy()); + this.once("connect", () => this.emit("ready")); + } + + address() { + return { + address: this.localAddress, + family: this.localFamily, + port: this.localPort, + }; + } + get bufferSize() { + return this.writableLength; + } + + #attach(port, socket) { + this.remotePort = port; + socket.data = this; + socket.timeout(this.timeout); + socket.ref(); + this[bunSocketInternal] = socket; + this.connecting = false; + if (!this.#upgraded) { + // this is not actually emitted on nodejs when socket used on the connection + // this is already emmited on non-TLS socket and on TLS socket is emmited secureConnect after handshake + this.emit("connect", this); + } + Socket.#Drain(socket); + } + + connect(port, host, connectListener) { + var path; + var connection = this.#socket; + var _checkServerIdentity = undefined; + if (typeof port === "string") { + path = port; + port = undefined; + + if (typeof host === "function") { connectListener = host; host = undefined; } - if (typeof port == "object") { - var { - port, - host, - path, - socket, - // TODOs - localAddress, - localPort, - family, - hints, - lookup, - noDelay, - keepAlive, - keepAliveInitialDelay, - requestCert, - rejectUnauthorized, - pauseOnConnect, - servername, - checkServerIdentity, - session, - } = port; - _checkServerIdentity = checkServerIdentity; - this.servername = servername; - if (socket) { - connection = socket; - } + } else if (typeof host == "function") { + if (typeof port === "string") { + path = port; + port = undefined; } - if (!pauseOnConnect) { - this.resume(); + connectListener = host; + host = undefined; + } + if (typeof port == "object") { + var { + port, + host, + path, + socket, + // TODOs + localAddress, + localPort, + family, + hints, + lookup, + noDelay, + keepAlive, + keepAliveInitialDelay, + requestCert, + rejectUnauthorized, + pauseOnConnect, + servername, + checkServerIdentity, + session, + } = port; + _checkServerIdentity = checkServerIdentity; + this.servername = servername; + if (socket) { + connection = socket; } - this.connecting = true; - this.remotePort = port; + } - const bunTLS = this[bunTlsSymbol]; - var tls = undefined; + if (!pauseOnConnect) { + this.resume(); + } + this.connecting = true; + this.remotePort = port; - if (typeof bunTLS === "function") { - tls = bunTLS.call(this, port, host, true); - // Client always request Cert - this._requestCert = true; - this._rejectUnauthorized = rejectUnauthorized; - - if (tls) { - tls.rejectUnauthorized = rejectUnauthorized; - tls.requestCert = true; - tls.session = session || tls.session; - this.servername = tls.servername; - tls.checkServerIdentity = _checkServerIdentity || tls.checkServerIdentity; - this[bunTLSConnectOptions] = tls; - if (!connection && tls.socket) { - connection = tls.socket; - } - } - if (connection) { - if ( - typeof connection !== "object" || - !(connection instanceof Socket) || - typeof connection[bunTlsSymbol] === "function" - ) { - throw new TypeError("socket must be an instance of net.Socket"); - } - } - this.authorized = false; - this.secureConnecting = true; - this._secureEstablished = false; - this._securePending = true; + const bunTLS = this[bunTlsSymbol]; + var tls = undefined; - if (connectListener) this.on("secureConnect", connectListener); - } else if (connectListener) this.on("connect", connectListener); - // start using existing connection + if (typeof bunTLS === "function") { + tls = bunTLS.call(this, port, host, true); + // Client always request Cert + this._requestCert = true; + this._rejectUnauthorized = rejectUnauthorized; + if (tls) { + tls.rejectUnauthorized = rejectUnauthorized; + tls.requestCert = true; + tls.session = session || tls.session; + this.servername = tls.servername; + tls.checkServerIdentity = _checkServerIdentity || tls.checkServerIdentity; + this[bunTLSConnectOptions] = tls; + if (!connection && tls.socket) { + connection = tls.socket; + } + } if (connection) { - const socket = connection[bunSocketInternal]; + if ( + typeof connection !== "object" || + !(connection instanceof Socket) || + typeof connection[bunTlsSymbol] === "function" + ) { + throw new TypeError("socket must be an instance of net.Socket"); + } + } + this.authorized = false; + this.secureConnecting = true; + this._secureEstablished = false; + this._securePending = true; + + if (connectListener) this.on("secureConnect", connectListener); + } else if (connectListener) this.on("connect", connectListener); + // start using existing connection + + if (connection) { + const socket = connection[bunSocketInternal]; + + if (socket) { + this.connecting = true; + this.#upgraded = true; + const result = socket.upgradeTLS({ + data: this, + tls, + socket: Socket.#Handlers, + }); + if (result) { + const [raw, tls] = result; + // replace socket + connection[bunSocketInternal] = raw; + raw.timeout(raw.timeout); + raw.connecting = false; + this[bunSocketInternal] = tls; + } else { + this[bunSocketInternal] = null; + throw new Error("Invalid socket"); + } + } else { + // wait to be connected + connection.once("connect", () => { + const socket = connection[bunSocketInternal]; + if (!socket) return; - if (socket) { this.connecting = true; this.#upgraded = true; const result = socket.upgradeTLS({ @@ -461,6 +466,7 @@ const Socket = (function (InternalSocket) { tls, socket: Socket.#Handlers, }); + if (result) { const [raw, tls] = result; // replace socket @@ -472,161 +478,136 @@ const Socket = (function (InternalSocket) { this[bunSocketInternal] = null; throw new Error("Invalid socket"); } - } else { - // wait to be connected - connection.once("connect", () => { - const socket = connection[bunSocketInternal]; - if (!socket) return; - - this.connecting = true; - this.#upgraded = true; - const result = socket.upgradeTLS({ - data: this, - tls, - socket: Socket.#Handlers, - }); - - if (result) { - const [raw, tls] = result; - // replace socket - connection[bunSocketInternal] = raw; - raw.timeout(raw.timeout); - raw.connecting = false; - this[bunSocketInternal] = tls; - } else { - this[bunSocketInternal] = null; - throw new Error("Invalid socket"); - } - }); - } - } else if (path) { - // start using unix socket - bunConnect({ - data: this, - unix: path, - socket: Socket.#Handlers, - tls, - }).catch(error => { - this.emit("error", error); - }); - } else { - // default start - bunConnect({ - data: this, - hostname: host || "localhost", - port: port, - socket: Socket.#Handlers, - tls, - }).catch(error => { - this.emit("error", error); }); } - return this; + } else if (path) { + // start using unix socket + bunConnect({ + data: this, + unix: path, + socket: Socket.#Handlers, + tls, + }).catch(error => { + this.emit("error", error); + }); + } else { + // default start + bunConnect({ + data: this, + hostname: host || "localhost", + port: port, + socket: Socket.#Handlers, + tls, + }).catch(error => { + this.emit("error", error); + }); } + return this; + } - _destroy(err, callback) { - this[bunSocketInternal]?.end(); - callback(err); - } + _destroy(err, callback) { + this[bunSocketInternal]?.end(); + callback(err); + } - _final(callback) { - this[bunSocketInternal]?.end(); - callback(); - } + _final(callback) { + this[bunSocketInternal]?.end(); + callback(); + } - get localAddress() { - return "127.0.0.1"; - } + get localAddress() { + return "127.0.0.1"; + } - get localFamily() { - return "IPv4"; - } + get localFamily() { + return "IPv4"; + } - get localPort() { - return this[bunSocketInternal]?.localPort; - } + get localPort() { + return this[bunSocketInternal]?.localPort; + } - get pending() { - return this.connecting; - } + get pending() { + return this.connecting; + } - _read(size) { - const queue = this.#readQueue; - let chunk; - while ((chunk = queue.peek())) { - if (!this.push(chunk)) return; - queue.shift(); - } + _read(size) { + const queue = this.#readQueue; + let chunk; + while ((chunk = queue.peek())) { + if (!this.push(chunk)) return; + queue.shift(); } + } - get readyState() { - if (this.connecting) return "opening"; - if (this.readable) { - return this.writable ? "open" : "readOnly"; - } else { - return this.writable ? "writeOnly" : "closed"; - } + get readyState() { + if (this.connecting) return "opening"; + if (this.readable) { + return this.writable ? "open" : "readOnly"; + } else { + return this.writable ? "writeOnly" : "closed"; } + } - ref() { - this[bunSocketInternal]?.ref(); - } + ref() { + this[bunSocketInternal]?.ref(); + } - get remoteAddress() { - return this[bunSocketInternal]?.remoteAddress; - } + get remoteAddress() { + return this[bunSocketInternal]?.remoteAddress; + } - get remoteFamily() { - return "IPv4"; - } + get remoteFamily() { + return "IPv4"; + } - resetAndDestroy() { - this[bunSocketInternal]?.end(); - } + resetAndDestroy() { + this[bunSocketInternal]?.end(); + } - setKeepAlive(enable = false, initialDelay = 0) { - // TODO - return this; - } + setKeepAlive(enable = false, initialDelay = 0) { + // TODO + return this; + } - setNoDelay(noDelay = true) { - // TODO - return this; - } + setNoDelay(noDelay = true) { + // TODO + return this; + } - setTimeout(timeout, callback) { - this[bunSocketInternal]?.timeout(timeout); - this.timeout = timeout; - if (callback) this.once("timeout", callback); - return this; - } + setTimeout(timeout, callback) { + this[bunSocketInternal]?.timeout(timeout); + this.timeout = timeout; + if (callback) this.once("timeout", callback); + return this; + } - unref() { - this[bunSocketInternal]?.unref(); - } + unref() { + this[bunSocketInternal]?.unref(); + } - _write(chunk, encoding, callback) { - if (typeof chunk == "string" && encoding !== "ascii") chunk = Buffer.from(chunk, encoding); - var written = this[bunSocketInternal]?.write(chunk); - if (written == chunk.length) { - callback(); - } else if (this.#writeCallback) { - callback(new Error("overlapping _write()")); - } else { - if (written > 0) { - if (typeof chunk == "string") { - chunk = chunk.slice(written); - } else { - chunk = chunk.subarray(written); - } + _write(chunk, encoding, callback) { + if (typeof chunk == "string" && encoding !== "ascii") chunk = Buffer.from(chunk, encoding); + var written = this[bunSocketInternal]?.write(chunk); + if (written == chunk.length) { + callback(); + } else if (this.#writeCallback) { + callback(new Error("overlapping _write()")); + } else { + if (written > 0) { + if (typeof chunk == "string") { + chunk = chunk.slice(written); + } else { + chunk = chunk.subarray(written); } - - this.#writeCallback = callback; - this.#writeChunk = chunk; } + + this.#writeCallback = callback; + this.#writeChunk = chunk; } - }, -); + } +} +es5ClassCompat(Socket); function createConnection(port, host, connectListener) { if (typeof port === "object") { @@ -861,6 +842,7 @@ class Server extends EventEmitter { return this; } } +es5ClassCompat(Server); function emitErrorNextTick(self, error) { self.emit("error", error); @@ -890,5 +872,4 @@ export default { isIPv4, isIPv6, Socket, - [Symbol.for("::bunternal::")]: SocketClass, }; |