aboutsummaryrefslogtreecommitdiff
path: root/src/js/node/net.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/node/net.js')
-rw-r--r--src/js/node/net.js164
1 files changed, 96 insertions, 68 deletions
diff --git a/src/js/node/net.js b/src/js/node/net.js
index f0873ae22..56b0f9ced 100644
--- a/src/js/node/net.js
+++ b/src/js/node/net.js
@@ -66,6 +66,14 @@ const bunSocketServerOptions = Symbol.for("::bunnetserveroptions::");
const bunSocketInternal = Symbol.for("::bunnetsocketinternal::");
const bunTLSConnectOptions = Symbol.for("::buntlsconnectoptions::");
+function closeNT(self) {
+ self.emit("close");
+}
+function endNT(socket, callback, err) {
+ socket.end();
+ callback(err);
+}
+
var SocketClass;
const Socket = (function (InternalSocket) {
SocketClass = InternalSocket;
@@ -74,17 +82,15 @@ const Socket = (function (InternalSocket) {
enumerable: false,
});
- return Object.defineProperty(
- function Socket(options) {
- return new InternalSocket(options);
- },
- Symbol.hasInstance,
- {
- value(instance) {
- return instance instanceof InternalSocket;
- },
+ function Socket(options) {
+ return new InternalSocket(options);
+ }
+ Socket.prototype = InternalSocket.prototype;
+ return Object.defineProperty(Socket, Symbol.hasInstance, {
+ value(instance) {
+ return instance instanceof InternalSocket;
},
- );
+ });
})(
class Socket extends Duplex {
static #Handlers = {
@@ -317,7 +323,7 @@ const Socket = (function (InternalSocket) {
this._parent = this;
this._parentWrap = this;
this.#pendingRead = undefined;
- this.#upgraded = false;
+ this.#upgraded = null;
if (socket instanceof Socket) {
this.#socket = socket;
}
@@ -352,6 +358,14 @@ const Socket = (function (InternalSocket) {
Socket.#Drain(socket);
}
+ #closeRawConnection() {
+ const connection = this.#upgraded;
+ connection[bunSocketInternal] = null;
+ connection.unref();
+ connection.destroy();
+ process.nextTick(closeNT, connection);
+ }
+
connect(port, host, connectListener) {
var path;
var connection = this.#socket;
@@ -412,7 +426,7 @@ const Socket = (function (InternalSocket) {
var tls = undefined;
if (typeof bunTLS === "function") {
- tls = bunTLS.call(this, port, host, true);
+ tls = bunTLS.$call(this, port, host, true);
// Client always request Cert
this._requestCert = true;
this._rejectUnauthorized = rejectUnauthorized;
@@ -446,90 +460,97 @@ const Socket = (function (InternalSocket) {
} else if (connectListener) this.on("connect", connectListener);
// start using existing connection
- if (connection) {
- const socket = connection[bunSocketInternal];
-
- if (socket) {
- this.connecting = true;
- this.#upgraded = true;
- const result = socket.upgradeTLS({
- data: this,
- tls,
- socket: Socket.#Handlers,
- });
- if (result) {
- const [raw, tls] = result;
- // replace socket
- connection[bunSocketInternal] = raw;
- raw.timeout(raw.timeout);
- raw.connecting = false;
- this[bunSocketInternal] = tls;
- } else {
- this[bunSocketInternal] = null;
- throw new Error("Invalid socket");
- }
- } else {
- // wait to be connected
- connection.once("connect", () => {
- const socket = connection[bunSocketInternal];
- if (!socket) return;
+ try {
+ if (connection) {
+ const socket = connection[bunSocketInternal];
+ if (socket) {
this.connecting = true;
- this.#upgraded = true;
+ this.#upgraded = connection;
const result = socket.upgradeTLS({
data: this,
tls,
socket: Socket.#Handlers,
});
-
if (result) {
const [raw, tls] = result;
// replace socket
connection[bunSocketInternal] = raw;
raw.timeout(raw.timeout);
+ this.once("end", this.#closeRawConnection);
raw.connecting = false;
this[bunSocketInternal] = tls;
} else {
this[bunSocketInternal] = null;
throw new Error("Invalid socket");
}
+ } else {
+ // wait to be connected
+ connection.once("connect", () => {
+ const socket = connection[bunSocketInternal];
+ if (!socket) return;
+
+ this.connecting = true;
+ this.#upgraded = connection;
+ const result = socket.upgradeTLS({
+ data: this,
+ tls,
+ socket: Socket.#Handlers,
+ });
+
+ if (result) {
+ const [raw, tls] = result;
+ // replace socket
+ connection[bunSocketInternal] = raw;
+ raw.timeout(raw.timeout);
+ this.once("end", this.#closeRawConnection);
+ raw.connecting = false;
+ this[bunSocketInternal] = tls;
+ } else {
+ this[bunSocketInternal] = null;
+ throw new Error("Invalid socket");
+ }
+ });
+ }
+ } else if (path) {
+ // start using unix socket
+ bunConnect({
+ data: this,
+ unix: path,
+ socket: Socket.#Handlers,
+ tls,
+ }).catch(error => {
+ this.emit("error", error);
+ this.emit("close");
+ });
+ } else {
+ // default start
+ bunConnect({
+ data: this,
+ hostname: host || "localhost",
+ port: port,
+ socket: Socket.#Handlers,
+ tls,
+ }).catch(error => {
+ this.emit("error", error);
+ this.emit("close");
});
}
- } else if (path) {
- // start using unix socket
- bunConnect({
- data: this,
- unix: path,
- socket: Socket.#Handlers,
- tls,
- }).catch(error => {
- this.emit("error", error);
- this.emit("close");
- });
- } else {
- // default start
- bunConnect({
- data: this,
- hostname: host || "localhost",
- port: port,
- socket: Socket.#Handlers,
- tls,
- }).catch(error => {
- this.emit("error", error);
- this.emit("close");
- });
+ } catch (error) {
+ process.nextTick(emitErrorAndCloseNextTick, this, error);
}
return this;
}
_destroy(err, callback) {
- this[bunSocketInternal]?.end();
- callback(err);
+ const socket = this[bunSocketInternal];
+ socket && process.nextTick(endNT, socket, callback, err);
}
_final(callback) {
this[bunSocketInternal]?.end();
callback();
+ process.nextTick(closeNT, this);
}
get localAddress() {
@@ -552,8 +573,10 @@ const Socket = (function (InternalSocket) {
const queue = this.#readQueue;
let chunk;
while ((chunk = queue.peek())) {
- if (!this.push(chunk)) return;
+ const can_continue = !this.push(chunk);
+ // always remove from queue push will queue it internally if needed
queue.shift();
+ if (!can_continue) break;
}
}
@@ -815,7 +838,7 @@ class Server extends EventEmitter {
const options = this[bunSocketServerOptions];
if (typeof bunTLS === "function") {
- [tls, TLSSocketClass] = bunTLS.call(this, port, hostname, false);
+ [tls, TLSSocketClass] = bunTLS.$call(this, port, hostname, false);
options.servername = tls.serverName;
options.InternalSocketClass = TLSSocketClass;
} else {
@@ -864,6 +887,11 @@ function emitErrorNextTick(self, error) {
self.emit("error", error);
}
+function emitErrorAndCloseNextTick(self, error) {
+ self.emit("error", error);
+ self.emit("close");
+}
+
function emitListeningNextTick(self, onListen) {
if (typeof onListen === "function") {
try {