diff options
-rw-r--r-- | src/bun.js/http.exports.js | 117 | ||||
-rw-r--r-- | src/bun.js/node/syscall.zig | 1 | ||||
-rw-r--r-- | src/bun.js/streams.exports.js | 7 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 14 | ||||
-rw-r--r-- | test/bun.js/node-http.test.ts | 72 | ||||
-rw-r--r-- | test/bun.js/string-decoder.test.js | 6 |
6 files changed, 178 insertions, 39 deletions
diff --git a/src/bun.js/http.exports.js b/src/bun.js/http.exports.js index d0a669d7b..9db81a3a7 100644 --- a/src/bun.js/http.exports.js +++ b/src/bun.js/http.exports.js @@ -1,5 +1,8 @@ const { EventEmitter } = import.meta.require("node:events"); const { Readable, Writable } = import.meta.require("node:stream"); + +const { newArrayWithSize, isPromise } = import.meta.primordials; + export function createServer(options, callback) { return new Server(options, callback); } @@ -93,7 +96,7 @@ export class Server extends EventEmitter { function assignHeaders(object, req) { var headers = req.headers.toJSON(); - const rawHeaders = new Array(req.headers.count * 2); + const rawHeaders = newArrayWithSize(req.headers.count * 2); var i = 0; for (const key in headers) { rawHeaders[i++] = key; @@ -102,7 +105,9 @@ function assignHeaders(object, req) { object.headers = headers; object.rawHeaders = rawHeaders; } - +function destroyBodyStreamNT(bodyStream) { + bodyStream.destroy(); +} export class IncomingMessage extends Readable { constructor(req) { const method = req.method; @@ -111,7 +116,7 @@ export class IncomingMessage extends Readable { const url = new URL(req.url); - this._no_body = + this.#noBody = "GET" === method || "HEAD" === method || "TRACE" === method || @@ -119,13 +124,12 @@ export class IncomingMessage extends Readable { "OPTIONS" === method || (parseInt(req.headers.get("Content-Length") || "") || 0) === 0; - this._req = req; + this.#req = req; this.method = method; - this.complete = !!this._no_body; - this._body_offset = 0; + this.complete = !!this.#noBody; - this._body = undefined; - this._socket = undefined; + this.#bodyStream = null; + this.#socket = undefined; this.url = url.pathname; assignHeaders(this, req); @@ -135,53 +139,78 @@ export class IncomingMessage extends Readable { rawHeaders; _consuming = false; _dumped = false; - _body; - _body_offset; - _socket; - _no_body; - _req; + #bodyStream = null; + #socket = undefined; + #noBody = false; + #aborted = false; + #req; url; _construct(callback) { // TODO: streaming - if (this._no_body) { + if (this.#noBody) { callback(); return; } - (async () => { - try { - this._body = Buffer.from(await this._req.arrayBuffer()); + const contentLength = this.#req.headers.get("content-length"); + const length = contentLength ? parseInt(contentLength, 10) : 0; - callback(); - } catch (err) { - callback(err); - } - })(); + if (length === 0) { + this.#noBody = true; + callback(); + return; + } + + callback(); + } + + #closeBodyStream() { + var bodyStream = this.#bodyStream; + if (bodyStream == null) return; + this.complete = true; + this.#bodyStream = undefined; + this.push(null); + // process.nextTick(destroyBodyStreamNT, bodyStream); } _read(size) { - if (this._no_body) { + if (this.#noBody) { this.push(null); this.complete = true; + } else if (this.#bodyStream === null) { + const contentLength = this.#req.headers.get("content-length"); + var remaining = contentLength ? parseInt(contentLength, 10) : 0; + this.#bodyStream = Readable.fromWeb(this.#req.body, { + highWaterMark: Number.isFinite(remaining) + ? Math.min(remaining, 16384) + : 16384, + }); + + this.#bodyStream.on("data", (chunk) => { + this.push(chunk); + remaining -= chunk?.byteLength ?? 0; + if (remaining <= 0) { + this.#closeBodyStream(); + } + }); + this.#bodyStream.on("end", () => { + this.#closeBodyStream(); + }); } else { - if (this._body_offset >= this._body.length) { - this.push(null); - this.complete = true; - } else { - this.push( - this._body.subarray(this._body_offset, (this._body_offset += size)), - ); - } + // this.#bodyStream.read(size); } } get aborted() { - return false; + return this.#aborted; } abort() { - throw new Error("not implemented"); + if (this.#aborted) return; + this.#aborted = true; + + this.#closeBodyStream(); } get connection() { @@ -217,10 +246,10 @@ export class IncomingMessage extends Readable { } get socket() { - var _socket = this._socket; + var _socket = this.#socket; if (_socket) return _socket; - this._socket = _socket = new EventEmitter(); + this.#socket = _socket = new EventEmitter(); this.on("end", () => _socket.emit("end")); this.on("close", () => _socket.emit("close")); @@ -245,6 +274,7 @@ export class ServerResponse extends Writable { this.#controller = undefined; this.#firstWrite = undefined; this._writableState.decodeStrings = false; + this.#deferred = undefined; } req; @@ -260,6 +290,8 @@ export class ServerResponse extends Writable { _defaultKeepAlive = false; _removedConnection = false; _removedContLen = false; + #deferred = undefined; + #finished = false; #fakeSocket; @@ -307,6 +339,11 @@ export class ServerResponse extends Writable { if (firstWrite) controller.write(firstWrite); firstWrite = undefined; run(controller); + if (!this.#finished) { + return new Promise((resolve) => { + this.#deferred = resolve; + }); + } }, }), { @@ -322,6 +359,7 @@ export class ServerResponse extends Writable { if (!this.headersSent) { var data = this.#firstWrite || ""; this.#firstWrite = undefined; + this.#finished = true; this._reply( new Response(data, { headers: this.#headers, @@ -333,9 +371,16 @@ export class ServerResponse extends Writable { return; } + this.#finished = true; this.#ensureReadableStreamController((controller) => { - controller.close(); + controller.end(); + callback(); + var deferred = this.#deferred; + if (deferred) { + this.#deferred = undefined; + deferred(); + } }); } diff --git a/src/bun.js/node/syscall.zig b/src/bun.js/node/syscall.zig index 14cee4e36..e2d197073 100644 --- a/src/bun.js/node/syscall.zig +++ b/src/bun.js/node/syscall.zig @@ -204,6 +204,7 @@ pub fn open(file_path: [:0]const u8, flags: JSC.Node.Mode, perm: JSC.Node.Mode) // That error is not unreachable for us pub fn close(fd: std.os.fd_t) ?Syscall.Error { log("close({d})", .{fd}); + std.debug.assert(fd != JSC.Node.invalid_fd); if (comptime Environment.isMac) { // This avoids the EINTR problem. return switch (system.getErrno(system.@"close$NOCANCEL"(fd))) { diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index 7e52312ad..28481dbb3 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -5764,7 +5764,12 @@ function createNativeStream(nativeType, Readable) { #internalConstruct(ptr) { this.#constructed = true; - start(ptr, this.#highWaterMark); + const result = start(ptr, this.#highWaterMark); + + if (typeof result === "number" && result > 1) { + this.#hasResized = true; + this.#highWaterMark = Math.min(this.#highWaterMark, result); + } if (drainFn) { const drainResult = drainFn(ptr); diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 7bb1c8f17..cac5fbafd 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -2929,6 +2929,18 @@ pub const ByteBlobLoader = struct { bun.default_allocator.destroy(this); } + pub fn drain(this: *ByteBlobLoader) bun.ByteList { + var temporary = this.store.sharedView(); + temporary = temporary[this.offset..]; + temporary = temporary[0..@minimum(16384, @minimum(temporary.len, this.remain))]; + + var cloned = bun.ByteList.init(temporary).listManaged(bun.default_allocator).clone() catch @panic("Out of memory"); + this.offset +|= @truncate(Blob.SizeType, cloned.items.len); + this.remain -|= @truncate(Blob.SizeType, cloned.items.len); + + return bun.ByteList.fromList(cloned); + } + pub const Source = ReadableStreamSource( @This(), "ByteBlob", @@ -2937,7 +2949,7 @@ pub const ByteBlobLoader = struct { onCancel, deinit, null, - null, + drain, ); }; diff --git a/test/bun.js/node-http.test.ts b/test/bun.js/node-http.test.ts new file mode 100644 index 000000000..24b3f1c85 --- /dev/null +++ b/test/bun.js/node-http.test.ts @@ -0,0 +1,72 @@ +import { describe, expect, it } from "bun:test"; +import { createServer } from "node:http"; + +describe("node:http", () => { + describe("createServer", async () => { + it("hello world", async () => { + const server = createServer((req, res) => { + res.writeHead(200, { "Content-Type": "text/plain" }); + res.end("Hello World"); + }); + server.listen(8123); + + const res = await fetch("http://localhost:8123"); + expect(await res.text()).toBe("Hello World"); + server.close(); + }); + + it("request & response body streaming (large)", async () => { + const bodyBlob = new Blob(["hello world", "hello world".repeat(9000)]); + + const input = await bodyBlob.text(); + + const server = createServer((req, res) => { + res.writeHead(200, { "Content-Type": "text/plain" }); + req.on("data", (chunk) => { + res.write(chunk); + }); + + req.on("end", () => { + res.end(); + }); + }); + server.listen(8124); + + const res = await fetch("http://localhost:8124", { + method: "POST", + body: bodyBlob, + }); + + const out = await res.text(); + expect(out).toBe(input); + server.close(); + }); + + it("request & response body streaming (small)", async () => { + const bodyBlob = new Blob(["hello world", "hello world".repeat(4)]); + + const input = await bodyBlob.text(); + + const server = createServer((req, res) => { + res.writeHead(200, { "Content-Type": "text/plain" }); + req.on("data", (chunk) => { + res.write(chunk); + }); + + req.on("end", () => { + res.end(); + }); + }); + server.listen(8125); + + const res = await fetch("http://localhost:8125", { + method: "POST", + body: bodyBlob, + }); + + const out = await res.text(); + expect(out).toBe(input); + server.close(); + }); + }); +}); diff --git a/test/bun.js/string-decoder.test.js b/test/bun.js/string-decoder.test.js index 4664cc388..a29577acc 100644 --- a/test/bun.js/string-decoder.test.js +++ b/test/bun.js/string-decoder.test.js @@ -1,5 +1,9 @@ import { expect, it } from "bun:test"; -import { StringDecoder } from "string_decoder"; +var { StringDecoder } = require("string_decoder"); + +it("require('string_decoder')", () => { + expect(StringDecoder1).toBe(StringDecoder); +}); it("StringDecoder-utf8", () => { test("utf-8", Buffer.from("$", "utf-8"), "$"); |