diff options
author | 2023-01-04 04:06:24 -0800 | |
---|---|---|
committer | 2023-01-04 04:06:24 -0800 | |
commit | e2231f15e8a11474e4b97b094e144a1983d7bfb0 (patch) | |
tree | 1fc8c7baa244bc9ff4a2f8a9cd8b9990e3d1f573 | |
parent | a19c7b43047fdd1f51c0282ca08944add8040bd6 (diff) | |
download | bun-e2231f15e8a11474e4b97b094e144a1983d7bfb0.tar.gz bun-e2231f15e8a11474e4b97b094e144a1983d7bfb0.tar.zst bun-e2231f15e8a11474e4b97b094e144a1983d7bfb0.zip |
Support non-classes in node:net (#1712)
* Support non-classes
* Update net.exports.js
* Make it less observable
* Update net.exports.js
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
-rw-r--r-- | src/bun.js/net.exports.js | 414 |
1 files changed, 216 insertions, 198 deletions
diff --git a/src/bun.js/net.exports.js b/src/bun.js/net.exports.js index 183fa86e1..967d01a7f 100644 --- a/src/bun.js/net.exports.js +++ b/src/bun.js/net.exports.js @@ -57,246 +57,264 @@ const { Bun, createFIFO } = import.meta.primordials; const { connect: bunConnect } = Bun; const { Duplex } = import.meta.require("node:stream"); -export class Socket extends Duplex { - static #Handlers = { - close: Socket.#Close, - connectError(socket, error) { - const self = socket.data; - self.emit("error", error); +export const Socket = (function (InternalSocket) { + return Object.defineProperty( + function Socket(options) { + return new InternalSocket(options); + }, + Symbol.hasInstance, + { + value(instance) { + return instance instanceof InternalSocket; + }, }, - data(socket, buffer) { + ); +})( + class Socket extends Duplex { + static #Handlers = { + close: Socket.#Close, + connectError(socket, error) { + const self = socket.data; + self.emit("error", error); + }, + data(socket, buffer) { + const self = socket.data; + self.bytesRead += buffer.length; + const queue = self.#readQueue; + if (queue.isEmpty()) { + if (self.push(buffer)) return; + } + queue.push(buffer); + }, + drain: Socket.#Drain, + end: Socket.#Close, + error(socket, error) { + const self = socket.data; + const callback = self.#writeCallback; + if (callback) { + self.#writeCallback = null; + callback(error); + } + self.emit("error", error); + }, + open(socket) { + const self = socket.data; + socket.timeout(self.timeout); + self.#socket = socket; + self.connecting = false; + self.emit("connect"); + Socket.#Drain(socket); + }, + timeout() { + const self = socket.data; + self.emit("timeout"); + }, + }; + + static #Close(socket) { const self = socket.data; - self.bytesRead += buffer.length; + if (self.#closed) return; + self.#closed = true; const queue = self.#readQueue; if (queue.isEmpty()) { - if (self.push(buffer)) return; + if (self.push(null)) return; } - queue.push(buffer); - }, - drain: Socket.#Drain, - end: Socket.#Close, - error(socket, error) { + queue.push(null); + } + + static #Drain(socket) { const self = socket.data; const callback = self.#writeCallback; if (callback) { - self.#writeCallback = null; - callback(error); + const chunk = self.#writeChunk; + const written = socket.write(chunk); + self.bytesWritten += written; + if (written < chunk.length) { + self.#writeChunk = chunk.slice(written); + } else { + self.#writeCallback = null; + self.#writeChunk = null; + callback(null); + } } - self.emit("error", error); - }, - open(socket) { - const self = socket.data; - socket.timeout(self.timeout); - self.#socket = socket; - self.connecting = false; - self.emit("connect"); - Socket.#Drain(socket); - }, - timeout() { - const self = socket.data; - self.emit("timeout"); - }, - }; - - static #Close(socket) { - const self = socket.data; - if (self.#closed) return; - self.#closed = true; - const queue = self.#readQueue; - if (queue.isEmpty()) { - if (self.push(null)) return; } - queue.push(null); - } - static #Drain(socket) { - const self = socket.data; - const callback = self.#writeCallback; - if (callback) { - const chunk = self.#writeChunk; - const written = socket.write(chunk); - self.bytesWritten += written; - if (written < chunk.length) { - self.#writeChunk = chunk.slice(written); - } else { - self.#writeCallback = null; - self.#writeChunk = null; - callback(null); - } + bytesRead = 0; + bytesWritten = 0; + #closed = false; + connecting = false; + localAddress = "127.0.0.1"; + #readQueue = createFIFO(); + remotePort; + #socket; + timeout = 0; + #writeCallback; + #writeChunk; + + constructor(options) { + super({ + allowHalfOpen: options?.allowHalfOpen || false, + readable: true, + writable: true, + }); + options?.signal?.once("abort", () => this.destroy()); + this.once("connect", () => this.emit("ready")); + // TODO support `options.fd` } - } - - bytesRead = 0; - bytesWritten = 0; - #closed = false; - connecting = false; - localAddress = "127.0.0.1"; - #readQueue = createFIFO(); - remotePort; - #socket; - timeout = 0; - #writeCallback; - #writeChunk; - - constructor(options) { - super({ - allowHalfOpen: options?.allowHalfOpen || false, - readable: true, - writable: true, - }); - options?.signal?.once("abort", () => this.destroy()); - this.once("connect", () => this.emit("ready")); - // TODO support `options.fd` - } - address() { - return { - address: this.localAddress, - family: this.localFamily, - port: this.localPort, - }; - } - - get bufferSize() { - return this.writableLength; - } + address() { + return { + address: this.localAddress, + family: this.localFamily, + port: this.localPort, + }; + } - connect(port, host, connectListener) { - // TODO support IPC sockets - if (typeof host == "function") { - connectListener = host; - host = undefined; + get bufferSize() { + return this.writableLength; } - if (typeof port == "object") { - var { - port, - host, - // TODOs - localAddress, - localPort, - family, - hints, - lookup, - noDelay, - keepAlive, - keepAliveInitialDelay, - } = port; + + connect(port, host, connectListener) { + // TODO support IPC sockets + if (typeof host == "function") { + connectListener = host; + host = undefined; + } + if (typeof port == "object") { + var { + port, + host, + // TODOs + localAddress, + localPort, + family, + hints, + lookup, + noDelay, + keepAlive, + keepAliveInitialDelay, + } = port; + } + this.connecting = true; + this.remotePort = port; + if (connectListener) this.on("connect", connectListener); + bunConnect({ + data: this, + hostname: host || "localhost", + port: port, + socket: Socket.#Handlers, + }); + return this; } - this.connecting = true; - this.remotePort = port; - if (connectListener) this.on("connect", connectListener); - bunConnect({ - data: this, - hostname: host || "localhost", - port: port, - socket: Socket.#Handlers, - }); - return this; - } - _destroy(err, callback) { - this.#socket?.end(); - callback(err); - } + _destroy(err, callback) { + this.#socket?.end(); + callback(err); + } - _final(callback) { - this.#socket.end(); - callback(); - } + _final(callback) { + this.#socket.end(); + callback(); + } - get localAddress() { - return "127.0.0.1"; - } + get localAddress() { + return "127.0.0.1"; + } - get localFamily() { - return "IPv4"; - } + get localFamily() { + return "IPv4"; + } - get localPort() { - return this.#socket?.localPort; - } + get localPort() { + return this.#socket?.localPort; + } - get pending() { - return this.connecting; - } + get pending() { + return this.connecting; + } - _read(size) { - const queue = this.#readQueue; - let chunk; - while (chunk = queue.peek()) { - if (!this.push(chunk)) break; - queue.shift(); + _read(size) { + const queue = this.#readQueue; + let chunk; + while ((chunk = queue.peek())) { + if (!this.push(chunk)) break; + queue.shift(); + } } - } - get readyState() { - if (this.connecting) return "opening"; - if (this.readable) { - return this.writable ? "open" : "readOnly"; - } else { - return this.writable ? "writeOnly" : "closed"; + get readyState() { + if (this.connecting) return "opening"; + if (this.readable) { + return this.writable ? "open" : "readOnly"; + } else { + return this.writable ? "writeOnly" : "closed"; + } } - } - ref() { - this.#socket?.ref(); - } + ref() { + this.#socket?.ref(); + } - get remoteAddress() { - return this.#socket.remoteAddress; - } + get remoteAddress() { + return this.#socket.remoteAddress; + } - get remoteFamily() { - return "IPv4"; - } + get remoteFamily() { + return "IPv4"; + } - resetAndDestroy() { - this.#socket?.end(); - } + resetAndDestroy() { + this.#socket?.end(); + } - setKeepAlive(enable = false, initialDelay = 0) { - // TODO - } + setKeepAlive(enable = false, initialDelay = 0) { + // TODO + } - setNoDelay(noDelay = true) { - // TODO - } + setNoDelay(noDelay = true) { + // TODO + } - setTimeout(timeout, callback) { - this.#socket?.timeout(timeout); - this.timeout = timeout; - if (callback) this.once("timeout", callback); - return this; - } + setTimeout(timeout, callback) { + this.#socket?.timeout(timeout); + this.timeout = timeout; + if (callback) this.once("timeout", callback); + return this; + } - unref() { - this.#socket?.unref(); - } + unref() { + this.#socket?.unref(); + } - _write(chunk, encoding, callback) { - if (typeof chunk == "string" && encoding !== "utf8") chunk = Buffer.from(chunk, encoding); - var written = this.#socket?.write(chunk); - if (written == chunk.length) { - callback(); - } else if (this.#writeCallback) { - callback(new Error("overlapping _write()")); - } else { - if (written > 0) chunk = chunk.slice(written); - this.#writeCallback = callback; - this.#writeChunk = chunk; + _write(chunk, encoding, callback) { + if (typeof chunk == "string" && encoding !== "utf8") + chunk = Buffer.from(chunk, encoding); + var written = this.#socket?.write(chunk); + if (written == chunk.length) { + callback(); + } else if (this.#writeCallback) { + callback(new Error("overlapping _write()")); + } else { + if (written > 0) chunk = chunk.slice(written); + this.#writeCallback = callback; + this.#writeChunk = chunk; + } } - } -} + }, +); export function createConnection(port, host, connectListener) { if (typeof host == "function") { connectListener = host; host = undefined; } - var options = typeof port == "object" ? port : { - host: host, - port: port, - }; + var options = + typeof port == "object" + ? port + : { + host: host, + port: port, + }; return new Socket(options).connect(options, connectListener); } |