aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/net.exports.js
diff options
context:
space:
mode:
authorGravatar Alex Lam S.L <alexlamsl@gmail.com> 2023-01-03 00:55:01 +0200
committerGravatar GitHub <noreply@github.com> 2023-01-02 14:55:01 -0800
commit983b747020a644233e8a716798796b71bda01854 (patch)
tree2ca06ff09e369a96eeb54e06590bf9ba048a004f /src/bun.js/net.exports.js
parent1ba95a65eec3cb6dae7deb0dda9d123bed6f7a9e (diff)
downloadbun-983b747020a644233e8a716798796b71bda01854.tar.gz
bun-983b747020a644233e8a716798796b71bda01854.tar.zst
bun-983b747020a644233e8a716798796b71bda01854.zip
implement `net.Socket` (#1701)
- support TCP sockets for now, i.e. no IPC - extra features like keep-alive, no-delay etc. are absent due to limitations of uSockets - fix `jest` to treat `done(nullish)` as success
Diffstat (limited to 'src/bun.js/net.exports.js')
-rw-r--r--src/bun.js/net.exports.js248
1 files changed, 248 insertions, 0 deletions
diff --git a/src/bun.js/net.exports.js b/src/bun.js/net.exports.js
index fee0c9afa..47e4b68db 100644
--- a/src/bun.js/net.exports.js
+++ b/src/bun.js/net.exports.js
@@ -53,9 +53,257 @@ export function isIP(s) {
return 0;
}
+const { Bun, createFIFO } = import.meta.primordials;
+const { connect: bunConnect } = Bun;
+const { Duplex } = import.meta.require("node:stream");
+
+export class Socket extends Duplex {
+ static #Handlers = {
+ close: Socket.#Close,
+ data(socket, buffer) {
+ const self = socket.data;
+ self.bytesRead += buffer.length;
+ const queue = self.#readQueue;
+ if (queue.isEmpty()) {
+ if (self.push(buffer)) return;
+ }
+ queue.push(buffer);
+ },
+ drain: Socket.#Drain,
+ end: Socket.#Close,
+ error(socket, error) {
+ const self = socket.data;
+ const callback = self.#writeCallback;
+ if (callback) {
+ self.#writeCallback = null;
+ callback(error);
+ }
+ self.emit("error", error);
+ },
+ open(socket) {
+ const self = socket.data;
+ socket.timeout(self.timeout);
+ self.#socket = socket;
+ self.connecting = false;
+ self.emit("connect");
+ Socket.#Drain(socket);
+ },
+ timeout() {
+ const self = socket.data;
+ self.emit("timeout");
+ },
+ };
+
+ static #Close(socket) {
+ const self = socket.data;
+ if (self.#closed) return;
+ self.#closed = true;
+ const queue = self.#readQueue;
+ if (queue.isEmpty()) {
+ if (self.push(null)) return;
+ }
+ queue.push(null);
+ }
+
+ static #Drain(socket) {
+ const self = socket.data;
+ const callback = self.#writeCallback;
+ if (callback) {
+ const chunk = self.#writeChunk;
+ const written = socket.write(chunk);
+ self.bytesWritten += written;
+ if (written < chunk.length) {
+ self.#writeChunk = chunk.slice(written);
+ } else {
+ self.#writeCallback = null;
+ self.#writeChunk = null;
+ callback(null);
+ }
+ }
+ }
+
+ bytesRead = 0;
+ bytesWritten = 0;
+ #closed = false;
+ connecting = false;
+ localAddress = "127.0.0.1";
+ #readQueue = createFIFO();
+ remotePort;
+ #socket;
+ timeout = 0;
+ #writeCallback;
+ #writeChunk;
+
+ constructor(options) {
+ super({
+ allowHalfOpen: options?.allowHalfOpen || false,
+ readable: true,
+ writable: true,
+ });
+ options?.signal?.once("abort", () => this.destroy());
+ this.once("connect", () => this.emit("ready"));
+ // TODO support `options.fd`
+ }
+
+ address() {
+ return {
+ address: this.localAddress,
+ family: this.localFamily,
+ port: this.localPort,
+ };
+ }
+
+ get bufferSize() {
+ return this.writableLength;
+ }
+
+ connect(port, host, connectListener) {
+ // TODO support IPC sockets
+ if (typeof host == "function") {
+ connectListener = host;
+ host = undefined;
+ }
+ if (typeof port == "object") {
+ var {
+ port,
+ host,
+ // TODOs
+ localAddress,
+ localPort,
+ family,
+ hints,
+ lookup,
+ noDelay,
+ keepAlive,
+ keepAliveInitialDelay,
+ } = port;
+ }
+ this.connecting = true;
+ this.remotePort = port;
+ if (connectListener) this.on("connect", connectListener);
+ bunConnect({
+ data: this,
+ hostname: host || "localhost",
+ port: port,
+ socket: Socket.#Handlers,
+ });
+ return this;
+ }
+
+ _destroy(err, callback) {
+ this.#socket?.end();
+ callback(err);
+ }
+
+ _final(callback) {
+ this.#socket.end();
+ callback();
+ }
+
+ get localAddress() {
+ return "127.0.0.1";
+ }
+
+ get localFamily() {
+ return "IPv4";
+ }
+
+ get localPort() {
+ return this.#socket?.localPort;
+ }
+
+ get pending() {
+ return this.connecting;
+ }
+
+ _read(size) {
+ const queue = this.#readQueue;
+ let chunk;
+ while (chunk = queue.peek()) {
+ if (!this.push(chunk)) break;
+ queue.shift();
+ }
+ }
+
+ get readyState() {
+ if (this.connecting) return "opening";
+ if (this.readable) {
+ return this.writable ? "open" : "readOnly";
+ } else {
+ return this.writable ? "writeOnly" : "closed";
+ }
+ }
+
+ ref() {
+ this.#socket?.ref();
+ }
+
+ get remoteAddress() {
+ return this.#socket.remoteAddress;
+ }
+
+ get remoteFamily() {
+ return "IPv4";
+ }
+
+ resetAndDestroy() {
+ this.#socket?.end();
+ }
+
+ setKeepAlive(enable = false, initialDelay = 0) {
+ // TODO
+ }
+
+ setNoDelay(noDelay = true) {
+ // TODO
+ }
+
+ setTimeout(timeout, callback) {
+ this.#socket?.timeout(timeout);
+ this.timeout = timeout;
+ if (callback) this.once("timeout", callback);
+ return this;
+ }
+
+ unref() {
+ this.#socket?.unref();
+ }
+
+ _write(chunk, encoding, callback) {
+ if (typeof chunk == "string" && encoding !== "utf8") chunk = Buffer.from(chunk, encoding);
+ var written = this.#socket?.write(chunk);
+ if (written == chunk.length) {
+ callback();
+ } else if (this.#writeCallback) {
+ callback(new Error("overlapping _write()"));
+ } else {
+ if (written > 0) chunk = chunk.slice(written);
+ this.#writeCallback = callback;
+ this.#writeChunk = chunk;
+ }
+ }
+}
+
+export function createConnection(port, host, connectListener) {
+ if (typeof host == "function") {
+ connectListener = host;
+ host = undefined;
+ }
+ var options = typeof port == "object" ? port : {
+ host: host,
+ port: port,
+ };
+ return new Socket(options).connect(options, connectListener);
+}
+
+export const connect = createConnection;
+
export default {
+ createConnection,
+ connect,
isIP,
isIPv4,
isIPv6,
+ Socket,
[Symbol.for("CommonJS")]: 0,
};