diff options
author | 2023-01-03 00:55:01 +0200 | |
---|---|---|
committer | 2023-01-02 14:55:01 -0800 | |
commit | 983b747020a644233e8a716798796b71bda01854 (patch) | |
tree | 2ca06ff09e369a96eeb54e06590bf9ba048a004f /src/bun.js/net.exports.js | |
parent | 1ba95a65eec3cb6dae7deb0dda9d123bed6f7a9e (diff) | |
download | bun-983b747020a644233e8a716798796b71bda01854.tar.gz bun-983b747020a644233e8a716798796b71bda01854.tar.zst bun-983b747020a644233e8a716798796b71bda01854.zip |
implement `net.Socket` (#1701)
- support TCP sockets for now, i.e. no IPC
- extra features like keep-alive, no-delay etc. are absent due to limitations of uSockets
- fix `jest` to treat `done(nullish)` as success
Diffstat (limited to 'src/bun.js/net.exports.js')
-rw-r--r-- | src/bun.js/net.exports.js | 248 |
1 files changed, 248 insertions, 0 deletions
diff --git a/src/bun.js/net.exports.js b/src/bun.js/net.exports.js index fee0c9afa..47e4b68db 100644 --- a/src/bun.js/net.exports.js +++ b/src/bun.js/net.exports.js @@ -53,9 +53,257 @@ export function isIP(s) { return 0; } +const { Bun, createFIFO } = import.meta.primordials; +const { connect: bunConnect } = Bun; +const { Duplex } = import.meta.require("node:stream"); + +export class Socket extends Duplex { + static #Handlers = { + close: Socket.#Close, + data(socket, buffer) { + const self = socket.data; + self.bytesRead += buffer.length; + const queue = self.#readQueue; + if (queue.isEmpty()) { + if (self.push(buffer)) return; + } + queue.push(buffer); + }, + drain: Socket.#Drain, + end: Socket.#Close, + error(socket, error) { + const self = socket.data; + const callback = self.#writeCallback; + if (callback) { + self.#writeCallback = null; + callback(error); + } + self.emit("error", error); + }, + open(socket) { + const self = socket.data; + socket.timeout(self.timeout); + self.#socket = socket; + self.connecting = false; + self.emit("connect"); + Socket.#Drain(socket); + }, + timeout() { + const self = socket.data; + self.emit("timeout"); + }, + }; + + static #Close(socket) { + const self = socket.data; + if (self.#closed) return; + self.#closed = true; + const queue = self.#readQueue; + if (queue.isEmpty()) { + if (self.push(null)) return; + } + queue.push(null); + } + + static #Drain(socket) { + const self = socket.data; + const callback = self.#writeCallback; + if (callback) { + const chunk = self.#writeChunk; + const written = socket.write(chunk); + self.bytesWritten += written; + if (written < chunk.length) { + self.#writeChunk = chunk.slice(written); + } else { + self.#writeCallback = null; + self.#writeChunk = null; + callback(null); + } + } + } + + bytesRead = 0; + bytesWritten = 0; + #closed = false; + connecting = false; + localAddress = "127.0.0.1"; + #readQueue = createFIFO(); + remotePort; + #socket; + timeout = 0; + #writeCallback; + #writeChunk; + + constructor(options) { + super({ + allowHalfOpen: options?.allowHalfOpen || false, + readable: true, + writable: true, + }); + options?.signal?.once("abort", () => this.destroy()); + this.once("connect", () => this.emit("ready")); + // TODO support `options.fd` + } + + address() { + return { + address: this.localAddress, + family: this.localFamily, + port: this.localPort, + }; + } + + get bufferSize() { + return this.writableLength; + } + + connect(port, host, connectListener) { + // TODO support IPC sockets + if (typeof host == "function") { + connectListener = host; + host = undefined; + } + if (typeof port == "object") { + var { + port, + host, + // TODOs + localAddress, + localPort, + family, + hints, + lookup, + noDelay, + keepAlive, + keepAliveInitialDelay, + } = port; + } + this.connecting = true; + this.remotePort = port; + if (connectListener) this.on("connect", connectListener); + bunConnect({ + data: this, + hostname: host || "localhost", + port: port, + socket: Socket.#Handlers, + }); + return this; + } + + _destroy(err, callback) { + this.#socket?.end(); + callback(err); + } + + _final(callback) { + this.#socket.end(); + callback(); + } + + get localAddress() { + return "127.0.0.1"; + } + + get localFamily() { + return "IPv4"; + } + + get localPort() { + return this.#socket?.localPort; + } + + get pending() { + return this.connecting; + } + + _read(size) { + const queue = this.#readQueue; + let chunk; + while (chunk = queue.peek()) { + if (!this.push(chunk)) break; + queue.shift(); + } + } + + get readyState() { + if (this.connecting) return "opening"; + if (this.readable) { + return this.writable ? "open" : "readOnly"; + } else { + return this.writable ? "writeOnly" : "closed"; + } + } + + ref() { + this.#socket?.ref(); + } + + get remoteAddress() { + return this.#socket.remoteAddress; + } + + get remoteFamily() { + return "IPv4"; + } + + resetAndDestroy() { + this.#socket?.end(); + } + + setKeepAlive(enable = false, initialDelay = 0) { + // TODO + } + + setNoDelay(noDelay = true) { + // TODO + } + + setTimeout(timeout, callback) { + this.#socket?.timeout(timeout); + this.timeout = timeout; + if (callback) this.once("timeout", callback); + return this; + } + + unref() { + this.#socket?.unref(); + } + + _write(chunk, encoding, callback) { + if (typeof chunk == "string" && encoding !== "utf8") chunk = Buffer.from(chunk, encoding); + var written = this.#socket?.write(chunk); + if (written == chunk.length) { + callback(); + } else if (this.#writeCallback) { + callback(new Error("overlapping _write()")); + } else { + if (written > 0) chunk = chunk.slice(written); + this.#writeCallback = callback; + this.#writeChunk = chunk; + } + } +} + +export function createConnection(port, host, connectListener) { + if (typeof host == "function") { + connectListener = host; + host = undefined; + } + var options = typeof port == "object" ? port : { + host: host, + port: port, + }; + return new Socket(options).connect(options, connectListener); +} + +export const connect = createConnection; + export default { + createConnection, + connect, isIP, isIPv4, isIPv6, + Socket, [Symbol.for("CommonJS")]: 0, }; |