diff options
-rw-r--r-- | src/bun.js/net.exports.js | 248 | ||||
-rw-r--r-- | src/bun.js/test/jest.zig | 12 | ||||
-rw-r--r-- | test/bun.js/socket/node-net.test.ts | 175 |
3 files changed, 432 insertions, 3 deletions
diff --git a/src/bun.js/net.exports.js b/src/bun.js/net.exports.js index fee0c9afa..47e4b68db 100644 --- a/src/bun.js/net.exports.js +++ b/src/bun.js/net.exports.js @@ -53,9 +53,257 @@ export function isIP(s) { return 0; } +const { Bun, createFIFO } = import.meta.primordials; +const { connect: bunConnect } = Bun; +const { Duplex } = import.meta.require("node:stream"); + +export class Socket extends Duplex { + static #Handlers = { + close: Socket.#Close, + data(socket, buffer) { + const self = socket.data; + self.bytesRead += buffer.length; + const queue = self.#readQueue; + if (queue.isEmpty()) { + if (self.push(buffer)) return; + } + queue.push(buffer); + }, + drain: Socket.#Drain, + end: Socket.#Close, + error(socket, error) { + const self = socket.data; + const callback = self.#writeCallback; + if (callback) { + self.#writeCallback = null; + callback(error); + } + self.emit("error", error); + }, + open(socket) { + const self = socket.data; + socket.timeout(self.timeout); + self.#socket = socket; + self.connecting = false; + self.emit("connect"); + Socket.#Drain(socket); + }, + timeout() { + const self = socket.data; + self.emit("timeout"); + }, + }; + + static #Close(socket) { + const self = socket.data; + if (self.#closed) return; + self.#closed = true; + const queue = self.#readQueue; + if (queue.isEmpty()) { + if (self.push(null)) return; + } + queue.push(null); + } + + static #Drain(socket) { + const self = socket.data; + const callback = self.#writeCallback; + if (callback) { + const chunk = self.#writeChunk; + const written = socket.write(chunk); + self.bytesWritten += written; + if (written < chunk.length) { + self.#writeChunk = chunk.slice(written); + } else { + self.#writeCallback = null; + self.#writeChunk = null; + callback(null); + } + } + } + + bytesRead = 0; + bytesWritten = 0; + #closed = false; + connecting = false; + localAddress = "127.0.0.1"; + #readQueue = createFIFO(); + remotePort; + #socket; + timeout = 0; + #writeCallback; + #writeChunk; + + constructor(options) { + super({ + allowHalfOpen: options?.allowHalfOpen || false, + readable: true, + writable: true, + }); + options?.signal?.once("abort", () => this.destroy()); + this.once("connect", () => this.emit("ready")); + // TODO support `options.fd` + } + + address() { + return { + address: this.localAddress, + family: this.localFamily, + port: this.localPort, + }; + } + + get bufferSize() { + return this.writableLength; + } + + connect(port, host, connectListener) { + // TODO support IPC sockets + if (typeof host == "function") { + connectListener = host; + host = undefined; + } + if (typeof port == "object") { + var { + port, + host, + // TODOs + localAddress, + localPort, + family, + hints, + lookup, + noDelay, + keepAlive, + keepAliveInitialDelay, + } = port; + } + this.connecting = true; + this.remotePort = port; + if (connectListener) this.on("connect", connectListener); + bunConnect({ + data: this, + hostname: host || "localhost", + port: port, + socket: Socket.#Handlers, + }); + return this; + } + + _destroy(err, callback) { + this.#socket?.end(); + callback(err); + } + + _final(callback) { + this.#socket.end(); + callback(); + } + + get localAddress() { + return "127.0.0.1"; + } + + get localFamily() { + return "IPv4"; + } + + get localPort() { + return this.#socket?.localPort; + } + + get pending() { + return this.connecting; + } + + _read(size) { + const queue = this.#readQueue; + let chunk; + while (chunk = queue.peek()) { + if (!this.push(chunk)) break; + queue.shift(); + } + } + + get readyState() { + if (this.connecting) return "opening"; + if (this.readable) { + return this.writable ? "open" : "readOnly"; + } else { + return this.writable ? "writeOnly" : "closed"; + } + } + + ref() { + this.#socket?.ref(); + } + + get remoteAddress() { + return this.#socket.remoteAddress; + } + + get remoteFamily() { + return "IPv4"; + } + + resetAndDestroy() { + this.#socket?.end(); + } + + setKeepAlive(enable = false, initialDelay = 0) { + // TODO + } + + setNoDelay(noDelay = true) { + // TODO + } + + setTimeout(timeout, callback) { + this.#socket?.timeout(timeout); + this.timeout = timeout; + if (callback) this.once("timeout", callback); + return this; + } + + unref() { + this.#socket?.unref(); + } + + _write(chunk, encoding, callback) { + if (typeof chunk == "string" && encoding !== "utf8") chunk = Buffer.from(chunk, encoding); + var written = this.#socket?.write(chunk); + if (written == chunk.length) { + callback(); + } else if (this.#writeCallback) { + callback(new Error("overlapping _write()")); + } else { + if (written > 0) chunk = chunk.slice(written); + this.#writeCallback = callback; + this.#writeChunk = chunk; + } + } +} + +export function createConnection(port, host, connectListener) { + if (typeof host == "function") { + connectListener = host; + host = undefined; + } + var options = typeof port == "object" ? port : { + host: host, + port: port, + }; + return new Socket(options).connect(options, connectListener); +} + +export const connect = createConnection; + export default { + createConnection, + connect, isIP, isIPv4, isIPv6, + Socket, [Symbol.for("CommonJS")]: 0, }; diff --git a/src/bun.js/test/jest.zig b/src/bun.js/test/jest.zig index 6870bb93e..24c1bd0e1 100644 --- a/src/bun.js/test/jest.zig +++ b/src/bun.js/test/jest.zig @@ -1275,8 +1275,12 @@ pub const TestScope = struct { JSC.setFunctionData(function, null); if (args.len > 0) { const err = args.ptr[0]; - globalThis.bunVM().runErrorHandlerWithDedupe(err, null); - task.handleResult(.{ .fail = active_test_expectation_counter.actual }, .callback); + if (err.isEmptyOrUndefinedOrNull()) { + task.handleResult(.{ .pass = active_test_expectation_counter.actual }, .callback); + } else { + globalThis.bunVM().runErrorHandlerWithDedupe(err, null); + task.handleResult(.{ .fail = active_test_expectation_counter.actual }, .callback); + } } else { task.handleResult(.{ .pass = active_test_expectation_counter.actual }, .callback); } @@ -1510,7 +1514,9 @@ pub const DescribeScope = struct { JSC.setFunctionData(function, null); if (args.len > 0) { const err = args.ptr[0]; - ctx.bunVM().runErrorHandlerWithDedupe(err, null); + if (!err.isEmptyOrUndefinedOrNull()) { + ctx.bunVM().runErrorHandlerWithDedupe(err, null); + } } scope.done = true; } diff --git a/test/bun.js/socket/node-net.test.ts b/test/bun.js/socket/node-net.test.ts new file mode 100644 index 000000000..47c9964cf --- /dev/null +++ b/test/bun.js/socket/node-net.test.ts @@ -0,0 +1,175 @@ +import { afterAll, beforeAll, beforeEach, describe, expect, it } from "bun:test"; +import { isIP, isIPv4, isIPv6, Socket } from "net"; + +it("should support net.isIP()", () => { + expect(isIP("::1")).toBe(6); + expect(isIP("foobar")).toBe(0); + expect(isIP("127.0.0.1")).toBe(4); + expect(isIP("127.0.0.1/24")).toBe(0); + expect(isIP("127.000.000.001")).toBe(0); +}); + +it("should support net.isIPv4()", () => { + expect(isIPv4("::1")).toBe(false); + expect(isIPv4("foobar")).toBe(false); + expect(isIPv4("127.0.0.1")).toBe(true); + expect(isIPv4("127.0.0.1/24")).toBe(false); + expect(isIPv4("127.000.000.001")).toBe(false); +}); + +it("should support net.isIPv6()", () => { + expect(isIPv6("::1")).toBe(true); + expect(isIPv6("foobar")).toBe(false); + expect(isIPv6("127.0.0.1")).toBe(false); + expect(isIPv6("127.0.0.1/24")).toBe(false); + expect(isIPv6("127.000.000.001")).toBe(false); +}); + +describe("net.Socket read", () => { + const message = "Hello World!".repeat(1024); + const port = 12345; + let erred, server; + + beforeAll(() => { + function drain(socket) { + const message = socket.data.message; + const written = socket.write(message); + if (written < message.length) { + socket.data.message = message.slice(written); + } else { + socket.end(); + } + } + + server = Bun.listen({ + hostname: "localhost", + port: port, + socket: { + open(socket) { + socket.data.message = message; + drain(socket); + }, + drain, + error(socket, err) { + erred = err; + }, + }, + data: { + message: "", + }, + }); + }); + + beforeEach(() => { + erred = undefined; + }); + + it("should work with .connect(port)", done => { + var data = ""; + const socket = new Socket().connect(port).on("connect", () => { + expect(socket).toBeDefined(); + expect(socket.connecting).toBe(false); + }).setEncoding("utf8").on("data", chunk => { + data += chunk; + }).on("end", () => { + expect(data).toBe(message); + done(erred); + }).on("error", done); + }); + + it("should work with .connect(port, listener)", done => { + var data = ""; + const socket = new Socket().connect(port, () => { + expect(socket).toBeDefined(); + expect(socket.connecting).toBe(false); + }).setEncoding("utf8").on("data", chunk => { + data += chunk; + }).on("end", () => { + expect(data).toBe(message); + done(erred); + }).on("error", done); + }); + + it("should work with .connect(port, host, listener)", done => { + var data = ""; + const socket = new Socket().connect(port, "localhost", () => { + expect(socket).toBeDefined(); + expect(socket.connecting).toBe(false); + }).setEncoding("utf8").on("data", chunk => { + data += chunk; + }).on("end", () => { + expect(data).toBe(message); + done(erred); + }).on("error", done); + }); + + afterAll(() => server.stop()); +}); + +describe("net.Socket write", () => { + const message = "Hello World!".repeat(1024); + const port = 54321; + let onClose, server; + + beforeAll(() => { + function close(socket) { + if (onClose) { + const done = onClose; + onClose = null; + expect(Buffer.concat(socket.data).toString("utf8")).toBe(message); + done(); + } + } + + server = Bun.listen({ + hostname: "localhost", + port: port, + socket: { + close, + data(socket, buffer) { + socket.data.push(buffer); + }, + end: close, + error(socket, err) { + onClose(err); + }, + open(socket) { + socket.data = []; + }, + }, + }); + }); + + it("should work with .end(data)", done => { + onClose = done; + const socket = new Socket().connect(port).on("ready", () => { + expect(socket).toBeDefined(); + expect(socket.connecting).toBe(false); + }).on("error", done).end(message); + }); + + it("should work with .write(data).end()", done => { + onClose = done; + const socket = new Socket().connect(port, () => { + expect(socket).toBeDefined(); + expect(socket.connecting).toBe(false); + }).on("error", done); + socket.write(message); + socket.end(); + }); + + it("should work with multiple .write()s", done => { + onClose = done; + const socket = new Socket().connect(port, "localhost", () => { + expect(socket).toBeDefined(); + expect(socket.connecting).toBe(false); + }).on("error", done); + const size = 10; + for (let i = 0; i < message.length; i += size) { + socket.write(message.slice(i, i + size)); + } + socket.end(); + }); + + afterAll(() => server.stop()); +}); |