aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bun.js/net.exports.js248
-rw-r--r--src/bun.js/test/jest.zig12
-rw-r--r--test/bun.js/socket/node-net.test.ts175
3 files changed, 432 insertions, 3 deletions
diff --git a/src/bun.js/net.exports.js b/src/bun.js/net.exports.js
index fee0c9afa..47e4b68db 100644
--- a/src/bun.js/net.exports.js
+++ b/src/bun.js/net.exports.js
@@ -53,9 +53,257 @@ export function isIP(s) {
return 0;
}
+const { Bun, createFIFO } = import.meta.primordials;
+const { connect: bunConnect } = Bun;
+const { Duplex } = import.meta.require("node:stream");
+
+export class Socket extends Duplex {
+ static #Handlers = {
+ close: Socket.#Close,
+ data(socket, buffer) {
+ const self = socket.data;
+ self.bytesRead += buffer.length;
+ const queue = self.#readQueue;
+ if (queue.isEmpty()) {
+ if (self.push(buffer)) return;
+ }
+ queue.push(buffer);
+ },
+ drain: Socket.#Drain,
+ end: Socket.#Close,
+ error(socket, error) {
+ const self = socket.data;
+ const callback = self.#writeCallback;
+ if (callback) {
+ self.#writeCallback = null;
+ callback(error);
+ }
+ self.emit("error", error);
+ },
+ open(socket) {
+ const self = socket.data;
+ socket.timeout(self.timeout);
+ self.#socket = socket;
+ self.connecting = false;
+ self.emit("connect");
+ Socket.#Drain(socket);
+ },
+ timeout() {
+ const self = socket.data;
+ self.emit("timeout");
+ },
+ };
+
+ static #Close(socket) {
+ const self = socket.data;
+ if (self.#closed) return;
+ self.#closed = true;
+ const queue = self.#readQueue;
+ if (queue.isEmpty()) {
+ if (self.push(null)) return;
+ }
+ queue.push(null);
+ }
+
+ static #Drain(socket) {
+ const self = socket.data;
+ const callback = self.#writeCallback;
+ if (callback) {
+ const chunk = self.#writeChunk;
+ const written = socket.write(chunk);
+ self.bytesWritten += written;
+ if (written < chunk.length) {
+ self.#writeChunk = chunk.slice(written);
+ } else {
+ self.#writeCallback = null;
+ self.#writeChunk = null;
+ callback(null);
+ }
+ }
+ }
+
+ bytesRead = 0;
+ bytesWritten = 0;
+ #closed = false;
+ connecting = false;
+ localAddress = "127.0.0.1";
+ #readQueue = createFIFO();
+ remotePort;
+ #socket;
+ timeout = 0;
+ #writeCallback;
+ #writeChunk;
+
+ constructor(options) {
+ super({
+ allowHalfOpen: options?.allowHalfOpen || false,
+ readable: true,
+ writable: true,
+ });
+ options?.signal?.once("abort", () => this.destroy());
+ this.once("connect", () => this.emit("ready"));
+ // TODO support `options.fd`
+ }
+
+ address() {
+ return {
+ address: this.localAddress,
+ family: this.localFamily,
+ port: this.localPort,
+ };
+ }
+
+ get bufferSize() {
+ return this.writableLength;
+ }
+
+ connect(port, host, connectListener) {
+ // TODO support IPC sockets
+ if (typeof host == "function") {
+ connectListener = host;
+ host = undefined;
+ }
+ if (typeof port == "object") {
+ var {
+ port,
+ host,
+ // TODOs
+ localAddress,
+ localPort,
+ family,
+ hints,
+ lookup,
+ noDelay,
+ keepAlive,
+ keepAliveInitialDelay,
+ } = port;
+ }
+ this.connecting = true;
+ this.remotePort = port;
+ if (connectListener) this.on("connect", connectListener);
+ bunConnect({
+ data: this,
+ hostname: host || "localhost",
+ port: port,
+ socket: Socket.#Handlers,
+ });
+ return this;
+ }
+
+ _destroy(err, callback) {
+ this.#socket?.end();
+ callback(err);
+ }
+
+ _final(callback) {
+ this.#socket.end();
+ callback();
+ }
+
+ get localAddress() {
+ return "127.0.0.1";
+ }
+
+ get localFamily() {
+ return "IPv4";
+ }
+
+ get localPort() {
+ return this.#socket?.localPort;
+ }
+
+ get pending() {
+ return this.connecting;
+ }
+
+ _read(size) {
+ const queue = this.#readQueue;
+ let chunk;
+ while (chunk = queue.peek()) {
+ if (!this.push(chunk)) break;
+ queue.shift();
+ }
+ }
+
+ get readyState() {
+ if (this.connecting) return "opening";
+ if (this.readable) {
+ return this.writable ? "open" : "readOnly";
+ } else {
+ return this.writable ? "writeOnly" : "closed";
+ }
+ }
+
+ ref() {
+ this.#socket?.ref();
+ }
+
+ get remoteAddress() {
+ return this.#socket.remoteAddress;
+ }
+
+ get remoteFamily() {
+ return "IPv4";
+ }
+
+ resetAndDestroy() {
+ this.#socket?.end();
+ }
+
+ setKeepAlive(enable = false, initialDelay = 0) {
+ // TODO
+ }
+
+ setNoDelay(noDelay = true) {
+ // TODO
+ }
+
+ setTimeout(timeout, callback) {
+ this.#socket?.timeout(timeout);
+ this.timeout = timeout;
+ if (callback) this.once("timeout", callback);
+ return this;
+ }
+
+ unref() {
+ this.#socket?.unref();
+ }
+
+ _write(chunk, encoding, callback) {
+ if (typeof chunk == "string" && encoding !== "utf8") chunk = Buffer.from(chunk, encoding);
+ var written = this.#socket?.write(chunk);
+ if (written == chunk.length) {
+ callback();
+ } else if (this.#writeCallback) {
+ callback(new Error("overlapping _write()"));
+ } else {
+ if (written > 0) chunk = chunk.slice(written);
+ this.#writeCallback = callback;
+ this.#writeChunk = chunk;
+ }
+ }
+}
+
+export function createConnection(port, host, connectListener) {
+ if (typeof host == "function") {
+ connectListener = host;
+ host = undefined;
+ }
+ var options = typeof port == "object" ? port : {
+ host: host,
+ port: port,
+ };
+ return new Socket(options).connect(options, connectListener);
+}
+
+export const connect = createConnection;
+
export default {
+ createConnection,
+ connect,
isIP,
isIPv4,
isIPv6,
+ Socket,
[Symbol.for("CommonJS")]: 0,
};
diff --git a/src/bun.js/test/jest.zig b/src/bun.js/test/jest.zig
index 6870bb93e..24c1bd0e1 100644
--- a/src/bun.js/test/jest.zig
+++ b/src/bun.js/test/jest.zig
@@ -1275,8 +1275,12 @@ pub const TestScope = struct {
JSC.setFunctionData(function, null);
if (args.len > 0) {
const err = args.ptr[0];
- globalThis.bunVM().runErrorHandlerWithDedupe(err, null);
- task.handleResult(.{ .fail = active_test_expectation_counter.actual }, .callback);
+ if (err.isEmptyOrUndefinedOrNull()) {
+ task.handleResult(.{ .pass = active_test_expectation_counter.actual }, .callback);
+ } else {
+ globalThis.bunVM().runErrorHandlerWithDedupe(err, null);
+ task.handleResult(.{ .fail = active_test_expectation_counter.actual }, .callback);
+ }
} else {
task.handleResult(.{ .pass = active_test_expectation_counter.actual }, .callback);
}
@@ -1510,7 +1514,9 @@ pub const DescribeScope = struct {
JSC.setFunctionData(function, null);
if (args.len > 0) {
const err = args.ptr[0];
- ctx.bunVM().runErrorHandlerWithDedupe(err, null);
+ if (!err.isEmptyOrUndefinedOrNull()) {
+ ctx.bunVM().runErrorHandlerWithDedupe(err, null);
+ }
}
scope.done = true;
}
diff --git a/test/bun.js/socket/node-net.test.ts b/test/bun.js/socket/node-net.test.ts
new file mode 100644
index 000000000..47c9964cf
--- /dev/null
+++ b/test/bun.js/socket/node-net.test.ts
@@ -0,0 +1,175 @@
+import { afterAll, beforeAll, beforeEach, describe, expect, it } from "bun:test";
+import { isIP, isIPv4, isIPv6, Socket } from "net";
+
+it("should support net.isIP()", () => {
+ expect(isIP("::1")).toBe(6);
+ expect(isIP("foobar")).toBe(0);
+ expect(isIP("127.0.0.1")).toBe(4);
+ expect(isIP("127.0.0.1/24")).toBe(0);
+ expect(isIP("127.000.000.001")).toBe(0);
+});
+
+it("should support net.isIPv4()", () => {
+ expect(isIPv4("::1")).toBe(false);
+ expect(isIPv4("foobar")).toBe(false);
+ expect(isIPv4("127.0.0.1")).toBe(true);
+ expect(isIPv4("127.0.0.1/24")).toBe(false);
+ expect(isIPv4("127.000.000.001")).toBe(false);
+});
+
+it("should support net.isIPv6()", () => {
+ expect(isIPv6("::1")).toBe(true);
+ expect(isIPv6("foobar")).toBe(false);
+ expect(isIPv6("127.0.0.1")).toBe(false);
+ expect(isIPv6("127.0.0.1/24")).toBe(false);
+ expect(isIPv6("127.000.000.001")).toBe(false);
+});
+
+describe("net.Socket read", () => {
+ const message = "Hello World!".repeat(1024);
+ const port = 12345;
+ let erred, server;
+
+ beforeAll(() => {
+ function drain(socket) {
+ const message = socket.data.message;
+ const written = socket.write(message);
+ if (written < message.length) {
+ socket.data.message = message.slice(written);
+ } else {
+ socket.end();
+ }
+ }
+
+ server = Bun.listen({
+ hostname: "localhost",
+ port: port,
+ socket: {
+ open(socket) {
+ socket.data.message = message;
+ drain(socket);
+ },
+ drain,
+ error(socket, err) {
+ erred = err;
+ },
+ },
+ data: {
+ message: "",
+ },
+ });
+ });
+
+ beforeEach(() => {
+ erred = undefined;
+ });
+
+ it("should work with .connect(port)", done => {
+ var data = "";
+ const socket = new Socket().connect(port).on("connect", () => {
+ expect(socket).toBeDefined();
+ expect(socket.connecting).toBe(false);
+ }).setEncoding("utf8").on("data", chunk => {
+ data += chunk;
+ }).on("end", () => {
+ expect(data).toBe(message);
+ done(erred);
+ }).on("error", done);
+ });
+
+ it("should work with .connect(port, listener)", done => {
+ var data = "";
+ const socket = new Socket().connect(port, () => {
+ expect(socket).toBeDefined();
+ expect(socket.connecting).toBe(false);
+ }).setEncoding("utf8").on("data", chunk => {
+ data += chunk;
+ }).on("end", () => {
+ expect(data).toBe(message);
+ done(erred);
+ }).on("error", done);
+ });
+
+ it("should work with .connect(port, host, listener)", done => {
+ var data = "";
+ const socket = new Socket().connect(port, "localhost", () => {
+ expect(socket).toBeDefined();
+ expect(socket.connecting).toBe(false);
+ }).setEncoding("utf8").on("data", chunk => {
+ data += chunk;
+ }).on("end", () => {
+ expect(data).toBe(message);
+ done(erred);
+ }).on("error", done);
+ });
+
+ afterAll(() => server.stop());
+});
+
+describe("net.Socket write", () => {
+ const message = "Hello World!".repeat(1024);
+ const port = 54321;
+ let onClose, server;
+
+ beforeAll(() => {
+ function close(socket) {
+ if (onClose) {
+ const done = onClose;
+ onClose = null;
+ expect(Buffer.concat(socket.data).toString("utf8")).toBe(message);
+ done();
+ }
+ }
+
+ server = Bun.listen({
+ hostname: "localhost",
+ port: port,
+ socket: {
+ close,
+ data(socket, buffer) {
+ socket.data.push(buffer);
+ },
+ end: close,
+ error(socket, err) {
+ onClose(err);
+ },
+ open(socket) {
+ socket.data = [];
+ },
+ },
+ });
+ });
+
+ it("should work with .end(data)", done => {
+ onClose = done;
+ const socket = new Socket().connect(port).on("ready", () => {
+ expect(socket).toBeDefined();
+ expect(socket.connecting).toBe(false);
+ }).on("error", done).end(message);
+ });
+
+ it("should work with .write(data).end()", done => {
+ onClose = done;
+ const socket = new Socket().connect(port, () => {
+ expect(socket).toBeDefined();
+ expect(socket.connecting).toBe(false);
+ }).on("error", done);
+ socket.write(message);
+ socket.end();
+ });
+
+ it("should work with multiple .write()s", done => {
+ onClose = done;
+ const socket = new Socket().connect(port, "localhost", () => {
+ expect(socket).toBeDefined();
+ expect(socket.connecting).toBe(false);
+ }).on("error", done);
+ const size = 10;
+ for (let i = 0; i < message.length; i += size) {
+ socket.write(message.slice(i, i + size));
+ }
+ socket.end();
+ });
+
+ afterAll(() => server.stop());
+});