diff options
Diffstat (limited to 'src/js/node/net.js')
-rw-r--r-- | src/js/node/net.js | 164 |
1 files changed, 96 insertions, 68 deletions
diff --git a/src/js/node/net.js b/src/js/node/net.js index f0873ae22..56b0f9ced 100644 --- a/src/js/node/net.js +++ b/src/js/node/net.js @@ -66,6 +66,14 @@ const bunSocketServerOptions = Symbol.for("::bunnetserveroptions::"); const bunSocketInternal = Symbol.for("::bunnetsocketinternal::"); const bunTLSConnectOptions = Symbol.for("::buntlsconnectoptions::"); +function closeNT(self) { + self.emit("close"); +} +function endNT(socket, callback, err) { + socket.end(); + callback(err); +} + var SocketClass; const Socket = (function (InternalSocket) { SocketClass = InternalSocket; @@ -74,17 +82,15 @@ const Socket = (function (InternalSocket) { enumerable: false, }); - return Object.defineProperty( - function Socket(options) { - return new InternalSocket(options); - }, - Symbol.hasInstance, - { - value(instance) { - return instance instanceof InternalSocket; - }, + function Socket(options) { + return new InternalSocket(options); + } + Socket.prototype = InternalSocket.prototype; + return Object.defineProperty(Socket, Symbol.hasInstance, { + value(instance) { + return instance instanceof InternalSocket; }, - ); + }); })( class Socket extends Duplex { static #Handlers = { @@ -317,7 +323,7 @@ const Socket = (function (InternalSocket) { this._parent = this; this._parentWrap = this; this.#pendingRead = undefined; - this.#upgraded = false; + this.#upgraded = null; if (socket instanceof Socket) { this.#socket = socket; } @@ -352,6 +358,14 @@ const Socket = (function (InternalSocket) { Socket.#Drain(socket); } + #closeRawConnection() { + const connection = this.#upgraded; + connection[bunSocketInternal] = null; + connection.unref(); + connection.destroy(); + process.nextTick(closeNT, connection); + } + connect(port, host, connectListener) { var path; var connection = this.#socket; @@ -412,7 +426,7 @@ const Socket = (function (InternalSocket) { var tls = undefined; if (typeof bunTLS === "function") { - tls = bunTLS.call(this, port, host, true); + tls = bunTLS.$call(this, port, host, true); // Client always request Cert this._requestCert = true; this._rejectUnauthorized = rejectUnauthorized; @@ -446,90 +460,97 @@ const Socket = (function (InternalSocket) { } else if (connectListener) this.on("connect", connectListener); // start using existing connection - if (connection) { - const socket = connection[bunSocketInternal]; - - if (socket) { - this.connecting = true; - this.#upgraded = true; - const result = socket.upgradeTLS({ - data: this, - tls, - socket: Socket.#Handlers, - }); - if (result) { - const [raw, tls] = result; - // replace socket - connection[bunSocketInternal] = raw; - raw.timeout(raw.timeout); - raw.connecting = false; - this[bunSocketInternal] = tls; - } else { - this[bunSocketInternal] = null; - throw new Error("Invalid socket"); - } - } else { - // wait to be connected - connection.once("connect", () => { - const socket = connection[bunSocketInternal]; - if (!socket) return; + try { + if (connection) { + const socket = connection[bunSocketInternal]; + if (socket) { this.connecting = true; - this.#upgraded = true; + this.#upgraded = connection; const result = socket.upgradeTLS({ data: this, tls, socket: Socket.#Handlers, }); - if (result) { const [raw, tls] = result; // replace socket connection[bunSocketInternal] = raw; raw.timeout(raw.timeout); + this.once("end", this.#closeRawConnection); raw.connecting = false; this[bunSocketInternal] = tls; } else { this[bunSocketInternal] = null; throw new Error("Invalid socket"); } + } else { + // wait to be connected + connection.once("connect", () => { + const socket = connection[bunSocketInternal]; + if (!socket) return; + + this.connecting = true; + this.#upgraded = connection; + const result = socket.upgradeTLS({ + data: this, + tls, + socket: Socket.#Handlers, + }); + + if (result) { + const [raw, tls] = result; + // replace socket + connection[bunSocketInternal] = raw; + raw.timeout(raw.timeout); + this.once("end", this.#closeRawConnection); + raw.connecting = false; + this[bunSocketInternal] = tls; + } else { + this[bunSocketInternal] = null; + throw new Error("Invalid socket"); + } + }); + } + } else if (path) { + // start using unix socket + bunConnect({ + data: this, + unix: path, + socket: Socket.#Handlers, + tls, + }).catch(error => { + this.emit("error", error); + this.emit("close"); + }); + } else { + // default start + bunConnect({ + data: this, + hostname: host || "localhost", + port: port, + socket: Socket.#Handlers, + tls, + }).catch(error => { + this.emit("error", error); + this.emit("close"); }); } - } else if (path) { - // start using unix socket - bunConnect({ - data: this, - unix: path, - socket: Socket.#Handlers, - tls, - }).catch(error => { - this.emit("error", error); - this.emit("close"); - }); - } else { - // default start - bunConnect({ - data: this, - hostname: host || "localhost", - port: port, - socket: Socket.#Handlers, - tls, - }).catch(error => { - this.emit("error", error); - this.emit("close"); - }); + } catch (error) { + process.nextTick(emitErrorAndCloseNextTick, this, error); } return this; } _destroy(err, callback) { - this[bunSocketInternal]?.end(); - callback(err); + const socket = this[bunSocketInternal]; + socket && process.nextTick(endNT, socket, callback, err); } _final(callback) { this[bunSocketInternal]?.end(); callback(); + process.nextTick(closeNT, this); } get localAddress() { @@ -552,8 +573,10 @@ const Socket = (function (InternalSocket) { const queue = this.#readQueue; let chunk; while ((chunk = queue.peek())) { - if (!this.push(chunk)) return; + const can_continue = !this.push(chunk); + // always remove from queue push will queue it internally if needed queue.shift(); + if (!can_continue) break; } } @@ -815,7 +838,7 @@ class Server extends EventEmitter { const options = this[bunSocketServerOptions]; if (typeof bunTLS === "function") { - [tls, TLSSocketClass] = bunTLS.call(this, port, hostname, false); + [tls, TLSSocketClass] = bunTLS.$call(this, port, hostname, false); options.servername = tls.serverName; options.InternalSocketClass = TLSSocketClass; } else { @@ -864,6 +887,11 @@ function emitErrorNextTick(self, error) { self.emit("error", error); } +function emitErrorAndCloseNextTick(self, error) { + self.emit("error", error); + self.emit("close"); +} + function emitListeningNextTick(self, onListen) { if (typeof onListen === "function") { try { |