aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-16 04:42:33 -0800
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-11-16 04:42:33 -0800
commitbf6b1742330343c6004789f02761426aeafdb47b (patch)
tree2c0c05672c7ee1d1cd3406d21e4e54aac370ed99
parent5de98f23bb4e69e54ea173039ce9ba5a0839e968 (diff)
downloadbun-bf6b1742330343c6004789f02761426aeafdb47b.tar.gz
bun-bf6b1742330343c6004789f02761426aeafdb47b.tar.zst
bun-bf6b1742330343c6004789f02761426aeafdb47b.zip
Make `node:http`.createServer work better
-rw-r--r--src/bun.js/http.exports.js117
-rw-r--r--src/bun.js/node/syscall.zig1
-rw-r--r--src/bun.js/streams.exports.js7
-rw-r--r--src/bun.js/webcore/streams.zig14
-rw-r--r--test/bun.js/node-http.test.ts72
-rw-r--r--test/bun.js/string-decoder.test.js6
6 files changed, 178 insertions, 39 deletions
diff --git a/src/bun.js/http.exports.js b/src/bun.js/http.exports.js
index d0a669d7b..9db81a3a7 100644
--- a/src/bun.js/http.exports.js
+++ b/src/bun.js/http.exports.js
@@ -1,5 +1,8 @@
const { EventEmitter } = import.meta.require("node:events");
const { Readable, Writable } = import.meta.require("node:stream");
+
+const { newArrayWithSize, isPromise } = import.meta.primordials;
+
export function createServer(options, callback) {
return new Server(options, callback);
}
@@ -93,7 +96,7 @@ export class Server extends EventEmitter {
function assignHeaders(object, req) {
var headers = req.headers.toJSON();
- const rawHeaders = new Array(req.headers.count * 2);
+ const rawHeaders = newArrayWithSize(req.headers.count * 2);
var i = 0;
for (const key in headers) {
rawHeaders[i++] = key;
@@ -102,7 +105,9 @@ function assignHeaders(object, req) {
object.headers = headers;
object.rawHeaders = rawHeaders;
}
-
+function destroyBodyStreamNT(bodyStream) {
+ bodyStream.destroy();
+}
export class IncomingMessage extends Readable {
constructor(req) {
const method = req.method;
@@ -111,7 +116,7 @@ export class IncomingMessage extends Readable {
const url = new URL(req.url);
- this._no_body =
+ this.#noBody =
"GET" === method ||
"HEAD" === method ||
"TRACE" === method ||
@@ -119,13 +124,12 @@ export class IncomingMessage extends Readable {
"OPTIONS" === method ||
(parseInt(req.headers.get("Content-Length") || "") || 0) === 0;
- this._req = req;
+ this.#req = req;
this.method = method;
- this.complete = !!this._no_body;
- this._body_offset = 0;
+ this.complete = !!this.#noBody;
- this._body = undefined;
- this._socket = undefined;
+ this.#bodyStream = null;
+ this.#socket = undefined;
this.url = url.pathname;
assignHeaders(this, req);
@@ -135,53 +139,78 @@ export class IncomingMessage extends Readable {
rawHeaders;
_consuming = false;
_dumped = false;
- _body;
- _body_offset;
- _socket;
- _no_body;
- _req;
+ #bodyStream = null;
+ #socket = undefined;
+ #noBody = false;
+ #aborted = false;
+ #req;
url;
_construct(callback) {
// TODO: streaming
- if (this._no_body) {
+ if (this.#noBody) {
callback();
return;
}
- (async () => {
- try {
- this._body = Buffer.from(await this._req.arrayBuffer());
+ const contentLength = this.#req.headers.get("content-length");
+ const length = contentLength ? parseInt(contentLength, 10) : 0;
- callback();
- } catch (err) {
- callback(err);
- }
- })();
+ if (length === 0) {
+ this.#noBody = true;
+ callback();
+ return;
+ }
+
+ callback();
+ }
+
+ #closeBodyStream() {
+ var bodyStream = this.#bodyStream;
+ if (bodyStream == null) return;
+ this.complete = true;
+ this.#bodyStream = undefined;
+ this.push(null);
+ // process.nextTick(destroyBodyStreamNT, bodyStream);
}
_read(size) {
- if (this._no_body) {
+ if (this.#noBody) {
this.push(null);
this.complete = true;
+ } else if (this.#bodyStream === null) {
+ const contentLength = this.#req.headers.get("content-length");
+ var remaining = contentLength ? parseInt(contentLength, 10) : 0;
+ this.#bodyStream = Readable.fromWeb(this.#req.body, {
+ highWaterMark: Number.isFinite(remaining)
+ ? Math.min(remaining, 16384)
+ : 16384,
+ });
+
+ this.#bodyStream.on("data", (chunk) => {
+ this.push(chunk);
+ remaining -= chunk?.byteLength ?? 0;
+ if (remaining <= 0) {
+ this.#closeBodyStream();
+ }
+ });
+ this.#bodyStream.on("end", () => {
+ this.#closeBodyStream();
+ });
} else {
- if (this._body_offset >= this._body.length) {
- this.push(null);
- this.complete = true;
- } else {
- this.push(
- this._body.subarray(this._body_offset, (this._body_offset += size)),
- );
- }
+ // this.#bodyStream.read(size);
}
}
get aborted() {
- return false;
+ return this.#aborted;
}
abort() {
- throw new Error("not implemented");
+ if (this.#aborted) return;
+ this.#aborted = true;
+
+ this.#closeBodyStream();
}
get connection() {
@@ -217,10 +246,10 @@ export class IncomingMessage extends Readable {
}
get socket() {
- var _socket = this._socket;
+ var _socket = this.#socket;
if (_socket) return _socket;
- this._socket = _socket = new EventEmitter();
+ this.#socket = _socket = new EventEmitter();
this.on("end", () => _socket.emit("end"));
this.on("close", () => _socket.emit("close"));
@@ -245,6 +274,7 @@ export class ServerResponse extends Writable {
this.#controller = undefined;
this.#firstWrite = undefined;
this._writableState.decodeStrings = false;
+ this.#deferred = undefined;
}
req;
@@ -260,6 +290,8 @@ export class ServerResponse extends Writable {
_defaultKeepAlive = false;
_removedConnection = false;
_removedContLen = false;
+ #deferred = undefined;
+ #finished = false;
#fakeSocket;
@@ -307,6 +339,11 @@ export class ServerResponse extends Writable {
if (firstWrite) controller.write(firstWrite);
firstWrite = undefined;
run(controller);
+ if (!this.#finished) {
+ return new Promise((resolve) => {
+ this.#deferred = resolve;
+ });
+ }
},
}),
{
@@ -322,6 +359,7 @@ export class ServerResponse extends Writable {
if (!this.headersSent) {
var data = this.#firstWrite || "";
this.#firstWrite = undefined;
+ this.#finished = true;
this._reply(
new Response(data, {
headers: this.#headers,
@@ -333,9 +371,16 @@ export class ServerResponse extends Writable {
return;
}
+ this.#finished = true;
this.#ensureReadableStreamController((controller) => {
- controller.close();
+ controller.end();
+
callback();
+ var deferred = this.#deferred;
+ if (deferred) {
+ this.#deferred = undefined;
+ deferred();
+ }
});
}
diff --git a/src/bun.js/node/syscall.zig b/src/bun.js/node/syscall.zig
index 14cee4e36..e2d197073 100644
--- a/src/bun.js/node/syscall.zig
+++ b/src/bun.js/node/syscall.zig
@@ -204,6 +204,7 @@ pub fn open(file_path: [:0]const u8, flags: JSC.Node.Mode, perm: JSC.Node.Mode)
// That error is not unreachable for us
pub fn close(fd: std.os.fd_t) ?Syscall.Error {
log("close({d})", .{fd});
+ std.debug.assert(fd != JSC.Node.invalid_fd);
if (comptime Environment.isMac) {
// This avoids the EINTR problem.
return switch (system.getErrno(system.@"close$NOCANCEL"(fd))) {
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js
index 7e52312ad..28481dbb3 100644
--- a/src/bun.js/streams.exports.js
+++ b/src/bun.js/streams.exports.js
@@ -5764,7 +5764,12 @@ function createNativeStream(nativeType, Readable) {
#internalConstruct(ptr) {
this.#constructed = true;
- start(ptr, this.#highWaterMark);
+ const result = start(ptr, this.#highWaterMark);
+
+ if (typeof result === "number" && result > 1) {
+ this.#hasResized = true;
+ this.#highWaterMark = Math.min(this.#highWaterMark, result);
+ }
if (drainFn) {
const drainResult = drainFn(ptr);
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 7bb1c8f17..cac5fbafd 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -2929,6 +2929,18 @@ pub const ByteBlobLoader = struct {
bun.default_allocator.destroy(this);
}
+ pub fn drain(this: *ByteBlobLoader) bun.ByteList {
+ var temporary = this.store.sharedView();
+ temporary = temporary[this.offset..];
+ temporary = temporary[0..@minimum(16384, @minimum(temporary.len, this.remain))];
+
+ var cloned = bun.ByteList.init(temporary).listManaged(bun.default_allocator).clone() catch @panic("Out of memory");
+ this.offset +|= @truncate(Blob.SizeType, cloned.items.len);
+ this.remain -|= @truncate(Blob.SizeType, cloned.items.len);
+
+ return bun.ByteList.fromList(cloned);
+ }
+
pub const Source = ReadableStreamSource(
@This(),
"ByteBlob",
@@ -2937,7 +2949,7 @@ pub const ByteBlobLoader = struct {
onCancel,
deinit,
null,
- null,
+ drain,
);
};
diff --git a/test/bun.js/node-http.test.ts b/test/bun.js/node-http.test.ts
new file mode 100644
index 000000000..24b3f1c85
--- /dev/null
+++ b/test/bun.js/node-http.test.ts
@@ -0,0 +1,72 @@
+import { describe, expect, it } from "bun:test";
+import { createServer } from "node:http";
+
+describe("node:http", () => {
+ describe("createServer", async () => {
+ it("hello world", async () => {
+ const server = createServer((req, res) => {
+ res.writeHead(200, { "Content-Type": "text/plain" });
+ res.end("Hello World");
+ });
+ server.listen(8123);
+
+ const res = await fetch("http://localhost:8123");
+ expect(await res.text()).toBe("Hello World");
+ server.close();
+ });
+
+ it("request & response body streaming (large)", async () => {
+ const bodyBlob = new Blob(["hello world", "hello world".repeat(9000)]);
+
+ const input = await bodyBlob.text();
+
+ const server = createServer((req, res) => {
+ res.writeHead(200, { "Content-Type": "text/plain" });
+ req.on("data", (chunk) => {
+ res.write(chunk);
+ });
+
+ req.on("end", () => {
+ res.end();
+ });
+ });
+ server.listen(8124);
+
+ const res = await fetch("http://localhost:8124", {
+ method: "POST",
+ body: bodyBlob,
+ });
+
+ const out = await res.text();
+ expect(out).toBe(input);
+ server.close();
+ });
+
+ it("request & response body streaming (small)", async () => {
+ const bodyBlob = new Blob(["hello world", "hello world".repeat(4)]);
+
+ const input = await bodyBlob.text();
+
+ const server = createServer((req, res) => {
+ res.writeHead(200, { "Content-Type": "text/plain" });
+ req.on("data", (chunk) => {
+ res.write(chunk);
+ });
+
+ req.on("end", () => {
+ res.end();
+ });
+ });
+ server.listen(8125);
+
+ const res = await fetch("http://localhost:8125", {
+ method: "POST",
+ body: bodyBlob,
+ });
+
+ const out = await res.text();
+ expect(out).toBe(input);
+ server.close();
+ });
+ });
+});
diff --git a/test/bun.js/string-decoder.test.js b/test/bun.js/string-decoder.test.js
index 4664cc388..a29577acc 100644
--- a/test/bun.js/string-decoder.test.js
+++ b/test/bun.js/string-decoder.test.js
@@ -1,5 +1,9 @@
import { expect, it } from "bun:test";
-import { StringDecoder } from "string_decoder";
+var { StringDecoder } = require("string_decoder");
+
+it("require('string_decoder')", () => {
+ expect(StringDecoder1).toBe(StringDecoder);
+});
it("StringDecoder-utf8", () => {
test("utf-8", Buffer.from("$", "utf-8"), "$");