aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <jarred@jarredsumner.com> 2023-08-23 14:05:05 -0700
committerGravatar GitHub <noreply@github.com> 2023-08-23 14:05:05 -0700
commitc60385716b7a7ac9f788cdf7dfe37250321e0670 (patch)
treeb08cc97e7e9d456efac7ec83d4862c8a8e3043bf
parentf3266ff436e0ed2aedd0d81f47a1ef104191a2c9 (diff)
downloadbun-c60385716b7a7ac9f788cdf7dfe37250321e0670.tar.gz
bun-c60385716b7a7ac9f788cdf7dfe37250321e0670.tar.zst
bun-c60385716b7a7ac9f788cdf7dfe37250321e0670.zip
Bunch of streams fixes (#4251)
* Update WebKit * Don't do async hooks things when async hooks are not enabled * Smarter scheduling of event loop tasks with the http server * less exciting approach * Bump WebKit * Another approach * Fix body-stream tests * Fixes #1886 * Fix UAF in fetch body streaming * Missing from commit * Fix leak * Fix the other leak * Fix test * Fix crash * missing duperef * Make this code clearer * Ignore empty chunks * Fixes #3969 * Delete flaky test * Update bun-linux-build.yml * Fix memory issue * fix result body, and .done status before the last callback, dont touch headers after sent once * refactor HTTPClientResult * less flasky corrupted test * oops * fix mutex invalid state * fix onProgressUpdate deinit/unlock * fix onProgressUpdate deinit/unlock * oops * remove verbose * fix posible null use * avoid http null * metadata can still be used onReject after toResponse * dont leak task.http * fix flask tests * less flask close tests --------- Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> Co-authored-by: cirospaciari <ciro.spaciari@gmail.com>
-rw-r--r--.github/workflows/bun-linux-aarch64.yml2
-rw-r--r--.github/workflows/bun-linux-build.yml4
-rw-r--r--.github/workflows/bun-mac-aarch64.yml16
-rw-r--r--.github/workflows/bun-mac-x64-baseline.yml16
-rw-r--r--.github/workflows/bun-mac-x64.yml16
-rw-r--r--Dockerfile2
-rwxr-xr-xbun.lockbbin73331 -> 73331 bytes
-rw-r--r--package.json2
m---------src/bun.js/WebKit0
-rw-r--r--src/bun.js/api/server.zig295
-rw-r--r--src/bun.js/bindings/AsyncContextFrame.cpp8
-rw-r--r--src/bun.js/bindings/ZigGlobalObject.cpp14
-rw-r--r--src/bun.js/event_loop.zig49
-rw-r--r--src/bun.js/javascript.zig4
-rw-r--r--src/bun.js/webcore/body.zig19
-rw-r--r--src/bun.js/webcore/request.zig20
-rw-r--r--src/bun.js/webcore/response.zig108
-rw-r--r--src/bun.js/webcore/streams.zig125
-rw-r--r--src/cli/test_command.zig4
-rw-r--r--src/http_client_async.zig229
-rw-r--r--src/install/install.zig4
-rw-r--r--src/js/builtins/ReadableStreamDefaultReader.ts14
-rw-r--r--src/js/node/async_hooks.ts7
-rw-r--r--src/js/out/InternalModuleRegistryConstants.h6
-rwxr-xr-xtest/bun.lockbbin153785 -> 153669 bytes
-rw-r--r--test/js/bun/http/fetch-file-upload.test.ts98
-rw-r--r--test/js/bun/http/serve.test.ts115
-rw-r--r--test/js/bun/stream/direct-readable-stream.test.tsx7
-rw-r--r--test/js/third_party/napi_create_external/napi-create-external.test.ts195
-rw-r--r--test/js/web/fetch/fetch.stream.test.ts31
-rw-r--r--test/package.json4
31 files changed, 866 insertions, 548 deletions
diff --git a/.github/workflows/bun-linux-aarch64.yml b/.github/workflows/bun-linux-aarch64.yml
index 4d08f59e0..816da7879 100644
--- a/.github/workflows/bun-linux-aarch64.yml
+++ b/.github/workflows/bun-linux-aarch64.yml
@@ -36,7 +36,7 @@ jobs:
arch: aarch64
build_arch: arm64
runner: linux-arm64
- webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-linux-arm64-lto.tar.gz"
+ webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-linux-arm64-lto.tar.gz"
webkit_basename: "bun-webkit-linux-arm64-lto"
build_machine_arch: aarch64
diff --git a/.github/workflows/bun-linux-build.yml b/.github/workflows/bun-linux-build.yml
index 3114d46bc..4e1ba20de 100644
--- a/.github/workflows/bun-linux-build.yml
+++ b/.github/workflows/bun-linux-build.yml
@@ -46,7 +46,7 @@ jobs:
arch: x86_64
build_arch: amd64
runner: big-ubuntu
- webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-linux-amd64-lto.tar.gz"
+ webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-linux-amd64-lto.tar.gz"
webkit_basename: "bun-webkit-linux-amd64-lto"
build_machine_arch: x86_64
- cpu: nehalem
@@ -54,7 +54,7 @@ jobs:
arch: x86_64
build_arch: amd64
runner: big-ubuntu
- webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-linux-amd64-lto.tar.gz"
+ webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-linux-amd64-lto.tar.gz"
webkit_basename: "bun-webkit-linux-amd64-lto"
build_machine_arch: x86_64
diff --git a/.github/workflows/bun-mac-aarch64.yml b/.github/workflows/bun-mac-aarch64.yml
index 1d24bf705..9cada9606 100644
--- a/.github/workflows/bun-mac-aarch64.yml
+++ b/.github/workflows/bun-mac-aarch64.yml
@@ -117,7 +117,7 @@ jobs:
# obj: bun-obj-darwin-x64-baseline
# runner: macos-11
# artifact: bun-obj-darwin-x64-baseline
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# dependencies: true
# compile_obj: false
# - cpu: haswell
@@ -126,7 +126,7 @@ jobs:
# obj: bun-obj-darwin-x64
# runner: macos-11
# artifact: bun-obj-darwin-x64
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# dependencies: true
# compile_obj: false
# - cpu: nehalem
@@ -135,7 +135,7 @@ jobs:
# obj: bun-obj-darwin-x64-baseline
# runner: macos-11
# artifact: bun-obj-darwin-x64-baseline
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# dependencies: false
# compile_obj: true
# - cpu: haswell
@@ -144,7 +144,7 @@ jobs:
# obj: bun-obj-darwin-x64
# runner: macos-11
# artifact: bun-obj-darwin-x64
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# dependencies: false
# compile_obj: true
- cpu: native
@@ -152,7 +152,7 @@ jobs:
tag: bun-darwin-aarch64
obj: bun-obj-darwin-aarch64
artifact: bun-obj-darwin-aarch64
- webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-arm64-lto.tar.gz"
+ webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-arm64-lto.tar.gz"
runner: macos-arm64
dependencies: true
compile_obj: true
@@ -257,7 +257,7 @@ jobs:
# package: bun-darwin-x64
# runner: macos-11
# artifact: bun-obj-darwin-x64-baseline
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# - cpu: haswell
# arch: x86_64
# tag: bun-darwin-x64
@@ -265,14 +265,14 @@ jobs:
# package: bun-darwin-x64
# runner: macos-11
# artifact: bun-obj-darwin-x64
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
- cpu: native
arch: aarch64
tag: bun-darwin-aarch64
obj: bun-obj-darwin-aarch64
package: bun-darwin-aarch64
artifact: bun-obj-darwin-aarch64
- webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-arm64-lto.tar.gz"
+ webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-arm64-lto.tar.gz"
runner: macos-arm64
steps:
- uses: actions/checkout@v3
diff --git a/.github/workflows/bun-mac-x64-baseline.yml b/.github/workflows/bun-mac-x64-baseline.yml
index 800989bbc..441c2b8bc 100644
--- a/.github/workflows/bun-mac-x64-baseline.yml
+++ b/.github/workflows/bun-mac-x64-baseline.yml
@@ -117,7 +117,7 @@ jobs:
obj: bun-obj-darwin-x64-baseline
runner: macos-11
artifact: bun-obj-darwin-x64-baseline
- webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
dependencies: true
compile_obj: false
# - cpu: haswell
@@ -126,7 +126,7 @@ jobs:
# obj: bun-obj-darwin-x64
# runner: macos-11
# artifact: bun-obj-darwin-x64
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# dependencies: true
# compile_obj: false
- cpu: nehalem
@@ -135,7 +135,7 @@ jobs:
obj: bun-obj-darwin-x64-baseline
runner: macos-11
artifact: bun-obj-darwin-x64-baseline
- webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
dependencies: false
compile_obj: true
# - cpu: haswell
@@ -144,7 +144,7 @@ jobs:
# obj: bun-obj-darwin-x64
# runner: macos-11
# artifact: bun-obj-darwin-x64
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# dependencies: false
# compile_obj: true
# - cpu: native
@@ -152,7 +152,7 @@ jobs:
# tag: bun-darwin-aarch64
# obj: bun-obj-darwin-aarch64
# artifact: bun-obj-darwin-aarch64
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# runner: macos-arm64
# dependencies: true
# compile_obj: true
@@ -258,7 +258,7 @@ jobs:
package: bun-darwin-x64
runner: macos-11
artifact: bun-obj-darwin-x64-baseline
- webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# - cpu: haswell
# arch: x86_64
# tag: bun-darwin-x64
@@ -266,14 +266,14 @@ jobs:
# package: bun-darwin-x64
# runner: macos-11
# artifact: bun-obj-darwin-x64
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# - cpu: native
# arch: aarch64
# tag: bun-darwin-aarch64
# obj: bun-obj-darwin-aarch64
# package: bun-darwin-aarch64
# artifact: bun-obj-darwin-aarch64
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# runner: macos-arm64
steps:
- uses: actions/checkout@v3
diff --git a/.github/workflows/bun-mac-x64.yml b/.github/workflows/bun-mac-x64.yml
index 74a8ca5c0..5c494a935 100644
--- a/.github/workflows/bun-mac-x64.yml
+++ b/.github/workflows/bun-mac-x64.yml
@@ -117,7 +117,7 @@ jobs:
# obj: bun-obj-darwin-x64-baseline
# runner: macos-11
# artifact: bun-obj-darwin-x64-baseline
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# dependencies: true
# compile_obj: false
- cpu: haswell
@@ -126,7 +126,7 @@ jobs:
obj: bun-obj-darwin-x64
runner: macos-11
artifact: bun-obj-darwin-x64
- webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
dependencies: true
compile_obj: false
# - cpu: nehalem
@@ -135,7 +135,7 @@ jobs:
# obj: bun-obj-darwin-x64-baseline
# runner: macos-11
# artifact: bun-obj-darwin-x64-baseline
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# dependencies: false
# compile_obj: true
- cpu: haswell
@@ -144,7 +144,7 @@ jobs:
obj: bun-obj-darwin-x64
runner: macos-11
artifact: bun-obj-darwin-x64
- webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
dependencies: false
compile_obj: true
# - cpu: native
@@ -152,7 +152,7 @@ jobs:
# tag: bun-darwin-aarch64
# obj: bun-obj-darwin-aarch64
# artifact: bun-obj-darwin-aarch64
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-arm64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-arm64-lto.tar.gz"
# runner: macos-arm64
# dependencies: true
# compile_obj: true
@@ -260,7 +260,7 @@ jobs:
# package: bun-darwin-x64
# runner: macos-11
# artifact: bun-obj-darwin-x64-baseline
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
- cpu: haswell
arch: x86_64
tag: bun-darwin-x64
@@ -268,14 +268,14 @@ jobs:
package: bun-darwin-x64
runner: macos-11
artifact: bun-obj-darwin-x64
- webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-amd64-lto.tar.gz"
+ webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-amd64-lto.tar.gz"
# - cpu: native
# arch: aarch64
# tag: bun-darwin-aarch64
# obj: bun-obj-darwin-aarch64
# package: bun-darwin-aarch64
# artifact: bun-obj-darwin-aarch64
- # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-4/bun-webkit-macos-arm64-lto.tar.gz"
+ # webkit_url: "https://github.com/oven-sh/WebKit/releases/download/2023-aug3-5/bun-webkit-macos-arm64-lto.tar.gz"
# runner: macos-arm64
steps:
- uses: actions/checkout@v3
diff --git a/Dockerfile b/Dockerfile
index c568db5ad..7599ea818 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -10,7 +10,7 @@ ARG ARCH=x86_64
ARG BUILD_MACHINE_ARCH=x86_64
ARG TRIPLET=${ARCH}-linux-gnu
ARG BUILDARCH=amd64
-ARG WEBKIT_TAG=2023-aug3-4
+ARG WEBKIT_TAG=2023-aug3-5
ARG ZIG_TAG=jul1
ARG ZIG_VERSION="0.11.0-dev.4006+bf827d0b5"
ARG WEBKIT_BASENAME="bun-webkit-linux-$BUILDARCH"
diff --git a/bun.lockb b/bun.lockb
index 391d3d191..c3e011117 100755
--- a/bun.lockb
+++ b/bun.lockb
Binary files differ
diff --git a/package.json b/package.json
index ba3a9a0b0..c4268a6cc 100644
--- a/package.json
+++ b/package.json
@@ -25,7 +25,7 @@
"@types/react": "^18.0.25",
"@typescript-eslint/eslint-plugin": "^5.31.0",
"@typescript-eslint/parser": "^5.31.0",
- "bun-webkit": "0.0.1-fd79ce3120a692f4aed314c3da3dd452b4aa865f"
+ "bun-webkit": "0.0.1-48c1316e907ca597e27e5a7624160dc18a4df8ec"
},
"version": "0.0.0",
"prettier": "./.prettierrc.cjs"
diff --git a/src/bun.js/WebKit b/src/bun.js/WebKit
-Subproject fd79ce3120a692f4aed314c3da3dd452b4aa865
+Subproject 48c1316e907ca597e27e5a7624160dc18a4df8e
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index 45c82b9fa..d6a9e1c6e 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -1016,7 +1016,7 @@ fn NewFlags(comptime debug_mode: bool) type {
is_transfer_encoding: bool = false,
/// Used to identify if request can be safely deinitialized
- is_waiting_body: bool = false,
+ is_waiting_for_request_body: bool = false,
/// Used in renderMissing in debug mode to show the user an HTML page
/// Used to avoid looking at the uws.Request struct after it's been freed
is_web_browser_navigation: if (debug_mode) bool else void = if (debug_mode) false else {},
@@ -1080,9 +1080,21 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
/// When the response body is a temporary value
response_buf_owned: std.ArrayListUnmanaged(u8) = .{},
+ /// Defer finalization until after the request handler task is completed?
+ defer_deinit_until_callback_completes: ?*bool = null,
+
// TODO: support builtin compression
const can_sendfile = !ssl_enabled;
+ pub inline fn isAsync(this: *const RequestContext) bool {
+ return this.defer_deinit_until_callback_completes == null;
+ }
+
+ fn drainMicrotasks(this: *const RequestContext) void {
+ if (this.isAsync()) return;
+ this.server.vm.drainMicrotasks();
+ }
+
pub fn setAbortHandler(this: *RequestContext) void {
if (this.flags.has_abort_handler) return;
if (this.resp) |resp| {
@@ -1320,8 +1332,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn end(this: *RequestContext, data: []const u8, closeConnection: bool) void {
if (this.resp) |resp| {
- if (this.flags.is_waiting_body) {
- this.flags.is_waiting_body = false;
+ if (this.flags.is_waiting_for_request_body) {
+ this.flags.is_waiting_for_request_body = false;
resp.clearOnData();
}
resp.end(data, closeConnection);
@@ -1331,8 +1343,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn endStream(this: *RequestContext, closeConnection: bool) void {
if (this.resp) |resp| {
- if (this.flags.is_waiting_body) {
- this.flags.is_waiting_body = false;
+ if (this.flags.is_waiting_for_request_body) {
+ this.flags.is_waiting_for_request_body = false;
resp.clearOnData();
}
resp.endStream(closeConnection);
@@ -1342,8 +1354,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
pub fn endWithoutBody(this: *RequestContext, closeConnection: bool) void {
if (this.resp) |resp| {
- if (this.flags.is_waiting_body) {
- this.flags.is_waiting_body = false;
+ if (this.flags.is_waiting_for_request_body) {
+ this.flags.is_waiting_for_request_body = false;
resp.clearOnData();
}
resp.endWithoutBody(closeConnection);
@@ -1562,9 +1574,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// if we are waiting for the body yet and the request was not aborted we can safely clear the onData callback
if (this.resp) |resp| {
- if (this.flags.is_waiting_body and this.flags.aborted == false) {
+ if (this.flags.is_waiting_for_request_body and this.flags.aborted == false) {
resp.clearOnData();
- this.flags.is_waiting_body = false;
+ this.flags.is_waiting_for_request_body = false;
}
}
}
@@ -1576,6 +1588,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
pub fn deinit(this: *RequestContext) void {
+ if (this.defer_deinit_until_callback_completes) |defer_deinit| {
+ defer_deinit.* = true;
+ ctxLog("deferred deinit <d> ({*})<r>", .{this});
+ return;
+ }
+
ctxLog("deinit<d> ({*})<r>", .{this});
if (comptime Environment.allow_assert)
std.debug.assert(this.flags.finalized);
@@ -1953,6 +1971,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
const StreamPair = struct { this: *RequestContext, stream: JSC.WebCore.ReadableStream };
+ fn handleFirstStreamWrite(this: *@This()) void {
+ if (!this.flags.has_written_status) {
+ this.renderMetadata();
+ }
+ }
+
fn doRenderStream(pair: *StreamPair) void {
var this = pair.this;
var stream = pair.stream;
@@ -1963,20 +1987,18 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
const resp = this.resp.?;
- // uWS automatically adds the status line if needed
- // we want to batch network calls as much as possible
- if (!(this.response_ptr.?.statusCode() == 200 and this.response_ptr.?.body.init.headers == null)) {
- this.renderMetadata();
- }
-
stream.value.ensureStillAlive();
var response_stream = this.allocator.create(ResponseStream.JSSink) catch unreachable;
+ var globalThis = this.server.globalThis;
response_stream.* = ResponseStream.JSSink{
.sink = .{
.res = resp,
.allocator = this.allocator,
.buffer = bun.ByteList{},
+ .onFirstWrite = @ptrCast(&handleFirstStreamWrite),
+ .ctx = this,
+ .globalThis = globalThis,
},
};
var signal = &response_stream.sink.signal;
@@ -1991,13 +2013,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// We are already corked!
const assignment_result: JSValue = ResponseStream.JSSink.assignToStream(
- this.server.globalThis,
+ globalThis,
stream.value,
response_stream,
@as(**anyopaque, @ptrCast(&signal.ptr)),
);
assignment_result.ensureStillAlive();
+
// assert that it was updated
std.debug.assert(!signal.isDead());
@@ -2015,32 +2038,18 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
response_stream.detach();
this.sink = null;
response_stream.sink.destroy();
- stream.value.unprotect();
return this.handleReject(err_value);
}
- if (response_stream.sink.done or
- // TODO: is there a condition where resp could be freed before done?
- resp.hasResponded())
- {
+ if (resp.hasResponded()) {
if (!this.flags.aborted) resp.clearAborted();
- const wrote_anything = response_stream.sink.wrote > 0;
- streamLog("is done", .{});
- const responded = resp.hasResponded();
-
+ streamLog("done", .{});
response_stream.detach();
this.sink = null;
response_stream.sink.destroy();
- if (!responded and !wrote_anything and !this.flags.aborted) {
- this.renderMissing();
- return;
- } else if (wrote_anything and !responded and !this.flags.aborted) {
- this.endStream(this.shouldCloseConnection());
- }
-
+ this.endStream(this.shouldCloseConnection());
this.finalize();
stream.value.unprotect();
-
return;
}
@@ -2049,19 +2058,28 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// it returns a Promise when it goes through ReadableStreamDefaultReader
if (assignment_result.asAnyPromise()) |promise| {
streamLog("returned a promise", .{});
- switch (promise.status(this.server.globalThis.vm())) {
+ this.drainMicrotasks();
+
+ switch (promise.status(globalThis.vm())) {
.Pending => {
streamLog("promise still Pending", .{});
+ if (!this.flags.has_written_status) {
+ response_stream.sink.onFirstWrite = null;
+ response_stream.sink.ctx = null;
+ this.renderMetadata();
+ }
+
// TODO: should this timeout?
this.setAbortHandler();
+ this.pending_promises_for_abort += 1;
this.response_ptr.?.body.value = .{
.Locked = .{
.readable = stream,
- .global = this.server.globalThis,
+ .global = globalThis,
},
};
assignment_result.then(
- this.server.globalThis,
+ globalThis,
this,
onResolveStream,
onRejectStream,
@@ -2071,11 +2089,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
},
.Fulfilled => {
streamLog("promise Fulfilled", .{});
+ defer stream.value.unprotect();
+
this.handleResolveStream();
},
.Rejected => {
streamLog("promise Rejected", .{});
- this.handleRejectStream(this.server.globalThis, promise.result(this.server.globalThis.vm()));
+ defer stream.value.unprotect();
+
+ this.handleRejectStream(globalThis, promise.result(globalThis.vm()));
},
}
return;
@@ -2084,22 +2106,23 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (this.flags.aborted) {
response_stream.detach();
- stream.cancel(this.server.globalThis);
- response_stream.sink.done = true;
+ stream.cancel(globalThis);
+ defer stream.value.unprotect();
+ response_stream.sink.markDone();
this.finalizeForAbort();
response_stream.sink.finalize();
- stream.value.unprotect();
return;
}
stream.value.ensureStillAlive();
+ defer stream.value.unprotect();
const is_in_progress = response_stream.sink.has_backpressure or !(response_stream.sink.wrote == 0 and
response_stream.sink.buffer.len == 0);
- if (!stream.isLocked(this.server.globalThis) and !is_in_progress) {
- if (JSC.WebCore.ReadableStream.fromJS(stream.value, this.server.globalThis)) |comparator| {
+ if (!stream.isLocked(globalThis) and !is_in_progress) {
+ if (JSC.WebCore.ReadableStream.fromJS(stream.value, globalThis)) |comparator| {
if (std.meta.activeTag(comparator.ptr) == std.meta.activeTag(stream.ptr)) {
streamLog("is not locked", .{});
this.renderMissing();
@@ -2111,7 +2134,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.setAbortHandler();
streamLog("is in progress, but did not return a Promise. Finalizing request context", .{});
this.finalize();
- stream.value.unprotect();
}
const streamLog = Output.scoped(.ReadableStream, false);
@@ -2120,6 +2142,46 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
return @intFromPtr(this.upgrade_context) == std.math.maxInt(usize);
}
+ fn toAsyncWithoutAbortHandler(ctx: *RequestContext, req: *uws.Request, request_object: *Request) void {
+ request_object.uws_request = req;
+
+ request_object.ensureURL() catch {
+ request_object.url = bun.String.empty;
+ };
+
+ // we have to clone the request headers here since they will soon belong to a different request
+ if (request_object.headers == null) {
+ request_object.headers = JSC.FetchHeaders.createFromUWS(ctx.server.globalThis, req);
+ }
+
+ // This object dies after the stack frame is popped
+ // so we have to clear it in here too
+ request_object.uws_request = null;
+ }
+
+ fn toAsync(
+ ctx: *RequestContext,
+ req: *uws.Request,
+ request_object: *Request,
+ ) void {
+ ctxLog("toAsync", .{});
+ ctx.toAsyncWithoutAbortHandler(req, request_object);
+ if (comptime debug_mode) {
+ ctx.pathname = request_object.url.clone();
+ }
+ ctx.setAbortHandler();
+ }
+
+ // Each HTTP request or TCP socket connection is effectively a "task".
+ //
+ // However, unlike the regular task queue, we don't drain the microtask
+ // queue at the end.
+ //
+ // Instead, we drain it multiple times, at the points that would
+ // otherwise "halt" the Response from being rendered.
+ //
+ // - If you return a Promise, we drain the microtask queue once
+ // - If you return a streaming Response, we drain the microtask queue (possibly the 2nd time this task!)
pub fn onResponse(
ctx: *RequestContext,
this: *ThisServer,
@@ -2128,8 +2190,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
request_value: JSValue,
response_value: JSValue,
) void {
+ _ = request_object;
+ _ = req;
request_value.ensureStillAlive();
response_value.ensureStillAlive();
+ ctx.drainMicrotasks();
if (ctx.flags.aborted) {
ctx.finalizeForAbort();
@@ -2159,6 +2224,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
ctx.response_jsvalue = response_value;
ctx.response_jsvalue.ensureStillAlive();
ctx.flags.response_protected = false;
+
response.body.value.toBlobIfPossible();
switch (response.body.value) {
@@ -2210,6 +2276,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
ctx.response_jsvalue.ensureStillAlive();
ctx.flags.response_protected = false;
ctx.response_ptr = response;
+
response.body.value.toBlobIfPossible();
switch (response.body.value) {
.Blob => |*blob| {
@@ -2236,35 +2303,10 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
if (wait_for_promise) {
- request_object.uws_request = req;
-
- request_object.ensureURL() catch {
- request_object.url = bun.String.empty;
- };
-
- // we have to clone the request headers here since they will soon belong to a different request
- if (request_object.headers == null) {
- request_object.headers = JSC.FetchHeaders.createFromUWS(this.globalThis, req);
- }
-
- if (comptime debug_mode) {
- ctx.pathname = request_object.url.clone();
- }
-
- // This object dies after the stack frame is popped
- // so we have to clear it in here too
- request_object.uws_request = null;
-
- ctx.setAbortHandler();
ctx.pending_promises_for_abort += 1;
-
response_value.then(this.globalThis, ctx, RequestContext.onResolve, RequestContext.onReject);
return;
}
- if (ctx.resp) |resp| {
- // The user returned something that wasn't a promise or a promise with a response
- if (!resp.hasResponded() and !ctx.flags.has_marked_pending) ctx.renderMissing();
- }
}
pub fn handleResolveStream(req: *RequestContext) void {
@@ -2276,6 +2318,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
wrapper.sink.done = true;
req.flags.aborted = req.flags.aborted or wrapper.sink.aborted;
wrote_anything = wrapper.sink.wrote > 0;
+
wrapper.sink.finalize();
wrapper.detach();
req.sink = null;
@@ -2300,12 +2343,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
const responded = resp.hasResponded();
- if (!responded and !wrote_anything) {
- resp.clearAborted();
- req.renderMissing();
- return;
- } else if (!responded and wrote_anything) {
+ if (!responded) {
resp.clearAborted();
+ if (!req.flags.has_written_status) {
+ req.renderMetadata();
+ }
req.endStream(req.shouldCloseConnection());
}
@@ -2316,6 +2358,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
streamLog("onResolveStream", .{});
var args = callframe.arguments(2);
var req: *@This() = args.ptr[args.len - 1].asPromisePtr(@This());
+ req.pending_promises_for_abort -|= 1;
req.handleResolveStream();
return JSValue.jsUndefined();
}
@@ -2323,19 +2366,19 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
streamLog("onRejectStream", .{});
const args = callframe.arguments(2);
var req = args.ptr[args.len - 1].asPromisePtr(@This());
+ req.pending_promises_for_abort -|= 1;
var err = args.ptr[0];
req.handleRejectStream(globalThis, err);
return JSValue.jsUndefined();
}
pub fn handleRejectStream(req: *@This(), globalThis: *JSC.JSGlobalObject, err: JSValue) void {
+ _ = globalThis;
streamLog("handleRejectStream", .{});
- var wrote_anything = req.flags.has_written_status;
if (req.sink) |wrapper| {
wrapper.sink.pending_flush = null;
wrapper.sink.done = true;
- wrote_anything = wrote_anything or wrapper.sink.wrote > 0;
req.flags.aborted = req.flags.aborted or wrapper.sink.aborted;
wrapper.sink.finalize();
wrapper.detach();
@@ -2350,40 +2393,32 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
- streamLog("onReject({any})", .{wrote_anything});
-
- //aborted so call finalizeForAbort
+ // aborted so call finalizeForAbort
if (req.flags.aborted) {
req.finalizeForAbort();
return;
}
- if (!err.isEmptyOrUndefinedOrNull() and !wrote_anything) {
- req.response_jsvalue.unprotect();
- req.response_jsvalue = JSValue.zero;
- req.handleReject(err);
- return;
- } else if (wrote_anything) {
- req.endStream(true);
- if (comptime debug_mode) {
- if (!err.isEmptyOrUndefinedOrNull()) {
- var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator);
- defer exception_list.deinit();
- req.server.vm.runErrorHandler(err, &exception_list);
- }
- }
- req.finalize();
- return;
+ streamLog("onReject()", .{});
+
+ if (!req.flags.has_written_status) {
+ req.renderMetadata();
}
- const fallback = JSC.SystemError{
- .code = bun.String.static(@as(string, @tagName(JSC.Node.ErrorCode.ERR_UNHANDLED_ERROR))),
- .message = bun.String.static("Unhandled error in ReadableStream"),
- };
- req.handleReject(fallback.toErrorInstance(globalThis));
+ req.endStream(true);
+ if (comptime debug_mode) {
+ if (!err.isEmptyOrUndefinedOrNull()) {
+ var exception_list: std.ArrayList(Api.JsException) = std.ArrayList(Api.JsException).init(req.allocator);
+ defer exception_list.deinit();
+ req.server.vm.runErrorHandler(err, &exception_list);
+ }
+ }
+ req.finalize();
}
pub fn doRenderWithBody(this: *RequestContext, value: *JSC.WebCore.Body.Value) void {
+ this.drainMicrotasks();
+
// If a ReadableStream can trivially be converted to a Blob, do so.
// If it's a WTFStringImpl and it cannot be used as a UTF-8 string, convert it to a Blob.
value.toBlobIfPossible();
@@ -2833,8 +2868,13 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
std.debug.assert(this.resp == resp);
- this.flags.is_waiting_body = last == false;
+ this.flags.is_waiting_for_request_body = last == false;
if (this.flags.aborted or this.flags.has_marked_complete) return;
+ if (!last and chunk.len == 0) {
+ // Sometimes, we get back an empty chunk
+ // We have to ignore those chunks unless it's the last one
+ return;
+ }
if (this.request_body != null) {
var body = this.request_body.?;
@@ -2900,6 +2940,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
if (old == .Locked) {
+ defer this.drainMicrotasks();
+
old.resolve(&body.value, this.server.globalThis);
}
return;
@@ -3254,7 +3296,7 @@ pub const WebSocketServer = struct {
globalObject.throwInvalidArguments("websocket expects maxPayloadLength to be an integer", .{});
return null;
}
- server.maxPayloadLength = @as(u32, @intCast(@max(value.toInt64(), 0)));
+ server.maxPayloadLength = @truncate(@max(value.toInt64(), 0));
}
}
@@ -3265,7 +3307,7 @@ pub const WebSocketServer = struct {
return null;
}
- var idleTimeout = @as(u16, @intCast(@as(u32, @truncate(@max(value.toInt64(), 0)))));
+ var idleTimeout: u16 = @truncate(@max(value.toInt64(), 0));
if (idleTimeout > 960) {
globalObject.throwInvalidArguments("websocket expects idleTimeout to be 960 or less", .{});
return null;
@@ -3285,7 +3327,7 @@ pub const WebSocketServer = struct {
return null;
}
- server.backpressureLimit = @as(u32, @intCast(@max(value.toInt64(), 0)));
+ server.backpressureLimit = @truncate(@max(value.toInt64(), 0));
}
}
@@ -5271,6 +5313,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
) void {
JSC.markBinding(@src());
this.pending_requests += 1;
+
req.setYield(false);
var ctx = this.request_pool_allocator.tryGet() catch @panic("ran out of memory");
ctx.create(this, req, resp);
@@ -5337,7 +5380,8 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
.onStartStreaming = RequestContext.onStartStreamingRequestBodyCallback,
},
};
- ctx.flags.is_waiting_body = true;
+ ctx.flags.is_waiting_for_request_body = true;
+
resp.onData(*RequestContext, RequestContext.onBufferedBodyChunk, ctx);
}
}
@@ -5352,7 +5396,13 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
request_value.ensureStillAlive();
const response_value = this.config.onRequest.callWithThis(this.globalThis, this.thisObject, &args);
+ defer {
+ // uWS request will not live longer than this function
+ request_object.uws_request = null;
+ }
+ var should_deinit_context = false;
+ ctx.defer_deinit_until_callback_completes = &should_deinit_context;
ctx.onResponse(
this,
req,
@@ -5360,8 +5410,20 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
request_value,
response_value,
);
- // uWS request will not live longer than this function
- request_object.uws_request = null;
+ ctx.defer_deinit_until_callback_completes = null;
+
+ if (should_deinit_context) {
+ request_object.uws_request = null;
+ ctx.deinit();
+ return;
+ }
+
+ if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body) {
+ ctx.renderMissing();
+ return;
+ }
+
+ ctx.toAsync(req, request_object);
}
pub fn onWebSocketUpgrade(
@@ -5404,7 +5466,13 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
const request_value = args[0];
request_value.ensureStillAlive();
const response_value = this.config.onRequest.callWithThis(this.globalThis, this.thisObject, &args);
+ defer {
+ // uWS request will not live longer than this function
+ request_object.uws_request = null;
+ }
+ var should_deinit_context = false;
+ ctx.defer_deinit_until_callback_completes = &should_deinit_context;
ctx.onResponse(
this,
req,
@@ -5412,9 +5480,20 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
request_value,
response_value,
);
+ ctx.defer_deinit_until_callback_completes = null;
- // uWS request will not live longer than this function
- request_object.uws_request = null;
+ if (should_deinit_context) {
+ request_object.uws_request = null;
+ ctx.deinit();
+ return;
+ }
+
+ if (!ctx.flags.has_marked_complete and !ctx.flags.has_marked_pending and ctx.pending_promises_for_abort == 0 and !ctx.flags.is_waiting_for_request_body) {
+ ctx.renderMissing();
+ return;
+ }
+
+ ctx.toAsync(req, request_object);
}
pub fn listen(this: *ThisServer) void {
diff --git a/src/bun.js/bindings/AsyncContextFrame.cpp b/src/bun.js/bindings/AsyncContextFrame.cpp
index 2a103a8d1..1c541b2a8 100644
--- a/src/bun.js/bindings/AsyncContextFrame.cpp
+++ b/src/bun.js/bindings/AsyncContextFrame.cpp
@@ -97,10 +97,18 @@ extern "C" EncodedJSValue AsyncContextFrame__withAsyncContextIfNeeded(JSGlobalOb
// }
JSValue AsyncContextFrame::call(JSGlobalObject* global, JSValue functionObject, JSValue thisValue, const ArgList& args)
{
+ if (LIKELY(!global->isAsyncContextTrackingEnabled())) {
+ return JSC::profiledCall(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args);
+ }
+
ASYNCCONTEXTFRAME_CALL_IMPL(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args);
}
JSValue AsyncContextFrame::call(JSGlobalObject* global, JSValue functionObject, JSValue thisValue, const ArgList& args, NakedPtr<Exception>& returnedException)
{
+ if (LIKELY(!global->isAsyncContextTrackingEnabled())) {
+ return JSC::profiledCall(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args, returnedException);
+ }
+
ASYNCCONTEXTFRAME_CALL_IMPL(global, ProfilingReason::API, functionObject, JSC::getCallData(functionObject), thisValue, args, returnedException);
}
diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp
index 0ecafeae4..d3bd623dd 100644
--- a/src/bun.js/bindings/ZigGlobalObject.cpp
+++ b/src/bun.js/bindings/ZigGlobalObject.cpp
@@ -1411,6 +1411,16 @@ JSC_DEFINE_HOST_FUNCTION(asyncHooksCleanupLater, (JSC::JSGlobalObject * globalOb
return JSC::JSValue::encode(JSC::jsUndefined());
}
+JSC_DEFINE_HOST_FUNCTION(asyncHooksSetEnabled, (JSC::JSGlobalObject * globalObject, JSC::CallFrame* callFrame))
+{
+ // assumptions and notes:
+ // - nobody else uses setOnEachMicrotaskTick
+ // - this is called by js if we set async context in a way we may not clear it
+ // - AsyncLocalStorage.prototype.run cleans up after itself and does not call this cb
+ globalObject->setAsyncContextTrackingEnabled(callFrame->argument(0).toBoolean(globalObject));
+ return JSC::JSValue::encode(JSC::jsUndefined());
+}
+
extern "C" int Bun__ttySetMode(int fd, int mode);
JSC_DEFINE_HOST_FUNCTION(jsTTYSetMode, (JSC::JSGlobalObject * globalObject, CallFrame* callFrame))
@@ -1689,6 +1699,10 @@ static JSC_DEFINE_HOST_FUNCTION(functionLazyLoad,
if (string == "async_hooks"_s) {
auto* obj = constructEmptyObject(globalObject);
obj->putDirect(
+ vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "setAsyncHooksEnabled"_s)),
+ JSC::JSFunction::create(vm, globalObject, 0, "setAsyncHooksEnabled"_s, asyncHooksSetEnabled, ImplementationVisibility::Public), 0);
+
+ obj->putDirect(
vm, JSC::PropertyName(JSC::Identifier::fromString(vm, "cleanupLater"_s)),
JSC::JSFunction::create(vm, globalObject, 0, "cleanupLater"_s, asyncHooksCleanupLater, ImplementationVisibility::Public), 0);
return JSValue::encode(obj);
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 92874b6a4..896297060 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -509,6 +509,7 @@ comptime {
}
}
+pub const DeferredRepeatingTask = *const (fn (*anyopaque) bool);
pub const EventLoop = struct {
tasks: Queue = undefined,
concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{},
@@ -518,6 +519,7 @@ pub const EventLoop = struct {
start_server_on_next_tick: bool = false,
defer_count: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
forever_timer: ?*uws.Timer = null,
+ deferred_microtask_map: std.AutoArrayHashMapUnmanaged(?*anyopaque, DeferredRepeatingTask) = .{},
pub const Queue = std.fifo.LinearFifo(Task, .Dynamic);
const log = bun.Output.scoped(.EventLoop, false);
@@ -528,6 +530,49 @@ pub const EventLoop = struct {
}
}
+ pub fn drainMicrotasksWithVM(this: *EventLoop, vm: *JSC.VM) void {
+ vm.drainMicrotasks();
+ this.drainDeferredTasks();
+ }
+
+ pub fn drainMicrotasks(this: *EventLoop) void {
+ this.drainMicrotasksWithVM(this.global.vm());
+ }
+
+ pub fn ensureAliveForOneTick(this: *EventLoop) void {
+ if (this.noop_task.scheduled) return;
+ this.enqueueTask(Task.init(&this.noop_task));
+ this.noop_task.scheduled = true;
+ }
+
+ pub fn registerDeferredTask(this: *EventLoop, ctx: ?*anyopaque, task: DeferredRepeatingTask) bool {
+ const existing = this.deferred_microtask_map.getOrPutValue(this.virtual_machine.allocator, ctx, task) catch unreachable;
+ return existing.found_existing;
+ }
+
+ pub fn unregisterDeferredTask(this: *EventLoop, ctx: ?*anyopaque) bool {
+ return this.deferred_microtask_map.swapRemove(ctx);
+ }
+
+ fn drainDeferredTasks(this: *EventLoop) void {
+ var i: usize = 0;
+ var last = this.deferred_microtask_map.count();
+ while (i < last) {
+ var key = this.deferred_microtask_map.keys()[i] orelse {
+ this.deferred_microtask_map.swapRemoveAt(i);
+ last = this.deferred_microtask_map.count();
+ continue;
+ };
+
+ if (!this.deferred_microtask_map.values()[i](key)) {
+ this.deferred_microtask_map.swapRemoveAt(i);
+ last = this.deferred_microtask_map.count();
+ } else {
+ i += 1;
+ }
+ }
+ }
+
pub fn tickWithCount(this: *EventLoop) u32 {
var global = this.global;
var global_vm = global.vm();
@@ -621,7 +666,7 @@ pub const EventLoop = struct {
}
global_vm.releaseWeakRefs();
- global_vm.drainMicrotasks();
+ this.drainMicrotasksWithVM(global_vm);
}
this.tasks.head = if (this.tasks.count == 0) 0 else this.tasks.head;
@@ -758,7 +803,7 @@ pub const EventLoop = struct {
this.tickConcurrent();
} else {
global_vm.releaseWeakRefs();
- global_vm.drainMicrotasks();
+ this.drainMicrotasksWithVM(global_vm);
this.tickConcurrent();
if (this.tasks.count > 0) continue;
}
diff --git a/src/bun.js/javascript.zig b/src/bun.js/javascript.zig
index a016129e2..492b9fbee 100644
--- a/src/bun.js/javascript.zig
+++ b/src/bun.js/javascript.zig
@@ -1742,6 +1742,10 @@ pub const VirtualMachine = struct {
ret.success = true;
}
+ pub fn drainMicrotasks(this: *VirtualMachine) void {
+ this.eventLoop().drainMicrotasks();
+ }
+
pub fn processFetchLog(globalThis: *JSGlobalObject, specifier: bun.String, referrer: bun.String, log: *logger.Log, ret: *ErrorableResolvedSource, err: anyerror) void {
switch (log.msgs.items.len) {
0 => {
diff --git a/src/bun.js/webcore/body.zig b/src/bun.js/webcore/body.zig
index fa0ec9b24..86462dd04 100644
--- a/src/bun.js/webcore/body.zig
+++ b/src/bun.js/webcore/body.zig
@@ -249,10 +249,7 @@ pub const Body = struct {
pub fn setPromise(value: *PendingValue, globalThis: *JSC.JSGlobalObject, action: Action) JSValue {
value.action = action;
- if (value.readable) |readable| {
- // switch (readable.ptr) {
- // .JavaScript
- // }
+ if (value.readable) |readable| handle_stream: {
switch (action) {
.getFormData, .getText, .getJSON, .getBlob, .getArrayBuffer => {
value.promise = switch (action) {
@@ -261,6 +258,20 @@ pub const Body = struct {
.getText => globalThis.readableStreamToText(readable.value),
.getBlob => globalThis.readableStreamToBlob(readable.value),
.getFormData => |form_data| brk: {
+ if (value.onStartBuffering != null) {
+ if (readable.isDisturbed(globalThis)) {
+ form_data.?.deinit();
+ readable.value.unprotect();
+ value.readable = null;
+ value.action = .{ .none = {} };
+ return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.createErrorInstance("ReadableStream is already used", .{}));
+ } else {
+ readable.detach(globalThis);
+ value.readable = null;
+ }
+
+ break :handle_stream;
+ }
defer {
form_data.?.deinit();
value.action.getFormData = null;
diff --git a/src/bun.js/webcore/request.zig b/src/bun.js/webcore/request.zig
index c01e72d60..aaa3f6b79 100644
--- a/src/bun.js/webcore/request.zig
+++ b/src/bun.js/webcore/request.zig
@@ -485,7 +485,8 @@ pub const Request = struct {
_ = req.body.unref();
return null;
};
- req.url = str;
+ req.url = str.dupeRef();
+
if (!req.url.isEmpty())
fields.insert(.url);
} else if (!url_or_object_type.isObject()) {
@@ -554,7 +555,7 @@ pub const Request = struct {
if (!fields.contains(.url)) {
if (!response.url.isEmpty()) {
- req.url = response.url;
+ req.url = response.url.dupeRef();
fields.insert(.url);
}
}
@@ -586,7 +587,7 @@ pub const Request = struct {
if (!fields.contains(.url)) {
if (value.fastGet(globalThis, .url)) |url| {
- req.url = bun.String.fromJS(url, globalThis);
+ req.url = bun.String.fromJS(url, globalThis).dupeRef();
if (!req.url.isEmpty())
fields.insert(.url);
@@ -599,7 +600,7 @@ pub const Request = struct {
_ = req.body.unref();
return null;
};
- req.url = str;
+ req.url = str.dupeRef();
if (!req.url.isEmpty())
fields.insert(.url);
}
@@ -648,9 +649,10 @@ pub const Request = struct {
return null;
}
- // Note that the string is going to be ref'd here, so we don't need to ref it above.
const href = JSC.URL.hrefFromString(req.url);
if (href.isEmpty()) {
+ // globalThis.throw can cause GC, which could cause the above string to be freed.
+ // so we must increment the reference count before calling it.
globalThis.throw("Failed to construct 'Request': Invalid URL \"{}\"", .{
req.url,
});
@@ -658,6 +660,14 @@ pub const Request = struct {
_ = req.body.unref();
return null;
}
+
+ // hrefFromString increments the reference count if they end up being
+ // the same
+ //
+ // we increment the reference count on usage above, so we must
+ // decrement it to be perfectly balanced.
+ req.url.deref();
+
req.url = href;
if (req.body.value == .Blob and
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 01ecfad36..d947a7d4e 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -617,6 +617,7 @@ pub const Fetch = struct {
http: ?*HTTPClient.AsyncHTTP = null,
result: HTTPClient.HTTPClientResult = .{},
+ metadata: ?HTTPClient.HTTPClientResult.ResultMetadata = .{},
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
request_body: HTTPRequestBody = undefined,
@@ -641,7 +642,8 @@ pub const Fetch = struct {
url_proxy_buffer: []const u8 = "",
signal: ?*JSC.WebCore.AbortSignal = null,
- aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+ signals: HTTPClient.Signals = .{},
+ signal_store: HTTPClient.Signals.Store = .{},
has_schedule_callback: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
// must be stored because AbortSignal stores reason weakly
@@ -702,11 +704,19 @@ pub const Fetch = struct {
this.request_headers.entries.deinit(bun.default_allocator);
this.request_headers.buf.deinit(bun.default_allocator);
this.request_headers = Headers{ .allocator = undefined };
- this.http.?.clearData();
- this.result.deinitMetadata();
+ if (this.http != null) {
+ this.http.?.clearData();
+ }
+
+ if (this.metadata != null) {
+ this.metadata.?.deinit();
+ this.metadata = null;
+ }
+
this.response_buffer.deinit();
this.response.deinit();
+
this.scheduled_response_buffer.deinit();
this.request_body.detach();
@@ -725,9 +735,13 @@ pub const Fetch = struct {
}
pub fn onBodyReceived(this: *FetchTasklet) void {
+ this.mutex.lock();
const success = this.result.isSuccess();
const globalThis = this.global_this;
defer {
+ this.has_schedule_callback.store(false, .Monotonic);
+ this.mutex.unlock();
+
if (!success or !this.result.has_more) {
var vm = globalThis.bunVM();
this.poll_ref.unref(vm);
@@ -831,43 +845,42 @@ pub const Fetch = struct {
pub fn onProgressUpdate(this: *FetchTasklet) void {
JSC.markBinding(@src());
- this.mutex.lock();
- defer {
- this.has_schedule_callback.store(false, .Monotonic);
- this.mutex.unlock();
- }
-
if (this.is_waiting_body) {
return this.onBodyReceived();
}
+ this.mutex.lock();
const globalThis = this.global_this;
var ref = this.promise;
const promise_value = ref.value();
- defer ref.strong.deinit();
var poll_ref = this.poll_ref;
var vm = globalThis.bunVM();
if (promise_value.isEmptyOrUndefinedOrNull()) {
+ ref.strong.deinit();
+ this.has_schedule_callback.store(false, .Monotonic);
+ this.mutex.unlock();
poll_ref.unref(vm);
this.clearData();
this.deinit();
return;
}
+ const promise = promise_value.asAnyPromise().?;
+ const tracker = this.tracker;
+ tracker.willDispatch(globalThis);
defer {
+ tracker.didDispatch(globalThis);
+ ref.strong.deinit();
+ this.has_schedule_callback.store(false, .Monotonic);
+ this.mutex.unlock();
if (!this.is_waiting_body) {
poll_ref.unref(vm);
this.clearData();
this.deinit();
}
}
-
- const promise = promise_value.asAnyPromise().?;
- const tracker = this.tracker;
- tracker.willDispatch(globalThis);
- defer tracker.didDispatch(globalThis);
const success = this.result.isSuccess();
const result = switch (success) {
true => this.onResolve(),
@@ -907,6 +920,16 @@ pub const Fetch = struct {
return JSC.WebCore.AbortSignal.createAbortError(JSC.ZigString.static("The user aborted a request"), &JSC.ZigString.Empty, this.global_this);
}
+ var path: bun.String = undefined;
+
+ if (this.metadata) |metadata| {
+ path = bun.String.create(metadata.href);
+ } else if (this.http) |http| {
+ path = bun.String.create(http.url.href);
+ } else {
+ path = bun.String.empty;
+ }
+
const fetch_error = JSC.SystemError{
.code = bun.String.static(@errorName(this.result.fail)),
.message = switch (this.result.fail) {
@@ -916,7 +939,7 @@ pub const Fetch = struct {
error.ConnectionRefused => bun.String.static("Unable to connect. Is the computer able to access the url?"),
else => bun.String.static("fetch() failed. For more information, pass `verbose: true` in the second argument to fetch()"),
},
- .path = bun.String.create(this.http.?.url.href),
+ .path = path,
};
return fetch_error.toErrorInstance(this.global_this);
@@ -927,7 +950,7 @@ pub const Fetch = struct {
if (this.http) |http| {
http.enableBodyStreaming();
}
- if (this.aborted.load(.Acquire)) {
+ if (this.signal_store.aborted.load(.Monotonic)) {
return JSC.WebCore.DrainResult{
.aborted = {},
};
@@ -1000,21 +1023,27 @@ pub const Fetch = struct {
}
fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response {
- const http_response = this.result.response;
- this.is_waiting_body = this.result.has_more;
- return Response{
- .allocator = allocator,
- .url = bun.String.createAtomIfPossible(this.result.href),
- .status_text = bun.String.createAtomIfPossible(http_response.status),
- .redirected = this.result.redirected,
- .body = .{
- .init = .{
- .headers = FetchHeaders.createFromPicoHeaders(http_response.headers),
- .status_code = @as(u16, @truncate(http_response.status_code)),
+ // at this point we always should have metadata
+ std.debug.assert(this.metadata != null);
+ if (this.metadata) |metadata| {
+ const http_response = metadata.response;
+ this.is_waiting_body = this.result.has_more;
+ return Response{
+ .allocator = allocator,
+ .url = bun.String.createAtomIfPossible(metadata.href),
+ .status_text = bun.String.createAtomIfPossible(http_response.status),
+ .redirected = this.result.redirected,
+ .body = .{
+ .init = .{
+ .headers = FetchHeaders.createFromPicoHeaders(http_response.headers),
+ .status_code = @as(u16, @truncate(http_response.status_code)),
+ },
+ .value = this.toBodyValue(),
},
- .value = this.toBodyValue(),
- },
- };
+ };
+ }
+
+ @panic("fetch metadata should be provided");
}
pub fn onResolve(this: *FetchTasklet) JSValue {
@@ -1063,6 +1092,7 @@ pub const Fetch = struct {
.hostname = fetch_options.hostname,
.tracker = JSC.AsyncTaskTracker.init(jsc_vm),
};
+ fetch_tasklet.signals = fetch_tasklet.signal_store.to();
fetch_tasklet.tracker.didSchedule(globalThis);
@@ -1079,6 +1109,10 @@ pub const Fetch = struct {
proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url);
}
+ if (fetch_tasklet.signal == null) {
+ fetch_tasklet.signals.aborted = null;
+ }
+
fetch_tasklet.http.?.* = HTTPClient.AsyncHTTP.init(
allocator,
fetch_options.method,
@@ -1095,9 +1129,10 @@ pub const Fetch = struct {
fetch_tasklet,
),
proxy,
- if (fetch_tasklet.signal != null) &fetch_tasklet.aborted else null,
+
fetch_options.hostname,
fetch_options.redirect_type,
+ fetch_tasklet.signals,
);
if (fetch_options.redirect_type != FetchRedirect.follow) {
@@ -1108,7 +1143,7 @@ pub const Fetch = struct {
fetch_tasklet.http.?.client.verbose = fetch_options.verbose;
fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive;
// we wanna to return after headers are received
- fetch_tasklet.http.?.signalHeaderProgress();
+ fetch_tasklet.signal_store.header_progress.store(true, .Monotonic);
if (fetch_tasklet.request_body == .Sendfile) {
std.debug.assert(fetch_options.url.isHTTP());
@@ -1127,7 +1162,7 @@ pub const Fetch = struct {
reason.ensureStillAlive();
this.abort_reason = reason;
reason.protect();
- this.aborted.store(true, .Monotonic);
+ this.signal_store.aborted.store(true, .Monotonic);
this.tracker.didCancel(this.global_this);
if (this.http != null) {
@@ -1180,11 +1215,14 @@ pub const Fetch = struct {
task.mutex.lock();
defer task.mutex.unlock();
task.result = result;
+ // metadata should be provided only once so we preserve it until we consume it
+ if (result.metadata) |metadata| {
+ task.metadata = metadata;
+ }
task.body_size = result.body_size;
const success = result.isSuccess();
task.response_buffer = result.body.?.*;
-
if (success) {
_ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch @panic("OOM");
}
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 955d10ffb..771d34db0 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -1915,6 +1915,34 @@ pub const ArrayBufferSink = struct {
pub const JSSink = NewJSSink(@This(), "ArrayBufferSink");
};
+const AutoFlusher = struct {
+ registered: bool = false,
+
+ pub fn registerDeferredMicrotaskWithType(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void {
+ if (this.auto_flusher.registered) return;
+ this.auto_flusher.registered = true;
+ std.debug.assert(!vm.eventLoop().registerDeferredTask(this, @ptrCast(&Type.onAutoFlush)));
+ }
+
+ pub fn unregisterDeferredMicrotaskWithType(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void {
+ if (!this.auto_flusher.registered) return;
+ this.auto_flusher.registered = false;
+ std.debug.assert(vm.eventLoop().unregisterDeferredTask(this));
+ }
+
+ pub fn unregisterDeferredMicrotaskWithTypeUnchecked(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void {
+ std.debug.assert(this.auto_flusher.registered);
+ std.debug.assert(vm.eventLoop().unregisterDeferredTask(this));
+ this.auto_flusher.registered = false;
+ }
+
+ pub fn registerDeferredMicrotaskWithTypeUnchecked(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void {
+ std.debug.assert(!this.auto_flusher.registered);
+ this.auto_flusher.registered = true;
+ std.debug.assert(!vm.eventLoop().registerDeferredTask(this, @ptrCast(&Type.onAutoFlush)));
+ }
+};
+
pub fn NewJSSink(comptime SinkType: type, comptime name_: []const u8) type {
return struct {
sink: SinkType,
@@ -2357,6 +2385,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
end_len: usize = 0,
aborted: bool = false,
+ onFirstWrite: ?*const fn (?*anyopaque) void = null,
+ ctx: ?*anyopaque = null,
+
+ auto_flusher: AutoFlusher = AutoFlusher{},
+
const log = Output.scoped(.HTTPServerWritable, false);
pub fn connect(this: *@This(), signal: Signal) void {
@@ -2375,15 +2408,25 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
}
+ fn handleFirstWriteIfNecessary(this: *@This()) void {
+ if (this.onFirstWrite) |onFirstWrite| {
+ var ctx = this.ctx;
+ this.ctx = null;
+ this.onFirstWrite = null;
+ onFirstWrite(ctx);
+ }
+ }
+
fn hasBackpressure(this: *const @This()) bool {
return this.has_backpressure;
}
- fn send(this: *@This(), buf: []const u8) bool {
+ fn sendWithoutAutoFlusher(this: *@This(), buf: []const u8) bool {
std.debug.assert(!this.done);
defer log("send: {d} bytes (backpressure: {any})", .{ buf.len, this.has_backpressure });
if (this.requested_end and !this.res.state().isHttpWriteCalled()) {
+ this.handleFirstWriteIfNecessary();
const success = this.res.tryEnd(buf, this.end_len, false);
this.has_backpressure = !success;
return success;
@@ -2395,10 +2438,12 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
// so in this scenario, we just append to the buffer
// and report success
if (this.requested_end) {
+ this.handleFirstWriteIfNecessary();
this.res.end(buf, false);
this.has_backpressure = false;
return true;
} else {
+ this.handleFirstWriteIfNecessary();
this.has_backpressure = !this.res.write(buf);
if (this.has_backpressure) {
this.res.onWritable(*@This(), onWritable, this);
@@ -2409,6 +2454,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
unreachable;
}
+ fn send(this: *@This(), buf: []const u8) bool {
+ this.unregisterAutoFlusher();
+ return this.sendWithoutAutoFlusher(buf);
+ }
+
fn readableSlice(this: *@This()) []const u8 {
return this.buffer.ptr[this.offset..this.buffer.cap][0..this.buffer.len];
}
@@ -2464,7 +2514,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn start(this: *@This(), stream_start: StreamStart) JSC.Node.Maybe(void) {
if (this.aborted or this.res.hasResponded()) {
- this.done = true;
+ this.markDone();
this.signal.close(null);
return .{ .result = {} };
}
@@ -2529,6 +2579,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn flushFromJS(this: *@This(), globalThis: *JSGlobalObject, wait: bool) JSC.Node.Maybe(JSValue) {
log("flushFromJS({any})", .{wait});
+ this.unregisterAutoFlusher();
+
if (!wait) {
return this.flushFromJSNoWait();
}
@@ -2563,12 +2615,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn flush(this: *@This()) JSC.Node.Maybe(void) {
log("flush()", .{});
+ this.unregisterAutoFlusher();
+
if (!this.hasBackpressure() or this.done) {
return .{ .result = {} };
}
if (this.res.hasResponded()) {
- this.done = true;
+ this.markDone();
this.signal.close(null);
}
@@ -2596,6 +2650,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
+ this.registerAutoFlusher();
} else if (this.buffer.len + len >= this.highWaterMark) {
// TODO: attempt to write both in a corked buffer?
_ = this.buffer.write(this.allocator, bytes) catch {
@@ -2613,9 +2668,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
+ this.registerAutoFlusher();
return .{ .owned = len };
}
+ this.registerAutoFlusher();
this.res.onWritable(*@This(), onWritable, this);
return .{ .owned = len };
@@ -2628,7 +2685,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.res.hasResponded()) {
this.signal.close(null);
- this.done = true;
+ this.markDone();
return .{ .done = {} };
}
@@ -2676,9 +2733,11 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
_ = this.buffer.writeLatin1(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
+ this.registerAutoFlusher();
return .{ .owned = len };
}
+ this.registerAutoFlusher();
this.res.onWritable(*@This(), onWritable, this);
return .{ .owned = len };
@@ -2690,7 +2749,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.res.hasResponded()) {
this.signal.close(null);
- this.done = true;
+ this.markDone();
return .{ .done = {} };
}
@@ -2715,9 +2774,15 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.res.onWritable(*@This(), onWritable, this);
}
+ this.registerAutoFlusher();
return .{ .owned = @as(Blob.SizeType, @intCast(written)) };
}
+ pub fn markDone(this: *@This()) void {
+ this.done = true;
+ this.unregisterAutoFlusher();
+ }
+
// In this case, it's always an error
pub fn end(this: *@This(), err: ?Syscall.Error) JSC.Node.Maybe(void) {
log("end({any})", .{err});
@@ -2728,7 +2793,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.done or this.res.hasResponded()) {
this.signal.close(err);
- this.done = true;
+ this.markDone();
this.finalize();
return .{ .result = {} };
}
@@ -2739,7 +2804,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (readable.len == 0) {
this.signal.close(err);
- this.done = true;
+ this.markDone();
// we do not close the stream here
// this.res.endStream(false);
this.finalize();
@@ -2759,7 +2824,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.done or this.res.hasResponded()) {
this.requested_end = true;
this.signal.close(null);
- this.done = true;
+ this.markDone();
this.finalize();
return .{ .result = JSC.JSValue.jsNumber(0) };
}
@@ -2780,10 +2845,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.res.end("", false);
}
- this.done = true;
+ this.markDone();
this.flushPromise();
this.signal.close(null);
- this.done = true;
this.finalize();
return .{ .result = JSC.JSValue.jsNumber(this.wrote) };
@@ -2796,12 +2860,50 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn abort(this: *@This()) void {
log("onAborted()", .{});
this.done = true;
+ this.unregisterAutoFlusher();
+
this.aborted = true;
this.signal.close(null);
+
this.flushPromise();
this.finalize();
}
+ fn unregisterAutoFlusher(this: *@This()) void {
+ if (this.auto_flusher.registered)
+ AutoFlusher.unregisterDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM());
+ }
+
+ fn registerAutoFlusher(this: *@This()) void {
+ if (!this.auto_flusher.registered)
+ AutoFlusher.registerDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM());
+ }
+
+ pub fn onAutoFlush(this: *@This()) bool {
+ log("onAutoFlush()", .{});
+ if (this.done) {
+ this.auto_flusher.registered = false;
+ return false;
+ }
+
+ const readable = this.readableSlice();
+
+ if (this.hasBackpressure() or readable.len == 0) {
+ this.auto_flusher.registered = false;
+ return false;
+ }
+
+ if (!this.sendWithoutAutoFlusher(readable)) {
+ this.auto_flusher.registered = true;
+ this.res.onWritable(*@This(), onWritable, this);
+ return true;
+ }
+
+ this.handleWrote(readable.len);
+ this.auto_flusher.registered = false;
+ return false;
+ }
+
pub fn destroy(this: *@This()) void {
log("destroy()", .{});
var bytes = this.buffer.listManaged(this.allocator);
@@ -2810,6 +2912,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
bytes.deinit();
}
+ this.unregisterAutoFlusher();
+
this.allocator.destroy(this);
}
@@ -2820,6 +2924,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (!this.done) {
this.done = true;
+ this.unregisterAutoFlusher();
this.res.endStream(false);
}
diff --git a/src/cli/test_command.zig b/src/cli/test_command.zig
index 5bf48d7d9..53dd4c3c5 100644
--- a/src/cli/test_command.zig
+++ b/src/cli/test_command.zig
@@ -955,12 +955,12 @@ pub const TestCommand = struct {
}
{
- vm.global.vm().drainMicrotasks();
+ vm.drainMicrotasks();
var count = vm.unhandled_error_counter;
vm.global.handleRejectedPromises();
while (vm.unhandled_error_counter > count) {
count = vm.unhandled_error_counter;
- vm.global.vm().drainMicrotasks();
+ vm.drainMicrotasks();
vm.global.handleRejectedPromises();
}
}
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 725e960d6..26978db22 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -60,6 +60,31 @@ var shared_response_headers_buf: [256]picohttp.Header = undefined;
const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n";
+pub const Signals = struct {
+ header_progress: ?*std.atomic.Atomic(bool) = null,
+ body_streaming: ?*std.atomic.Atomic(bool) = null,
+ aborted: ?*std.atomic.Atomic(bool) = null,
+
+ pub const Store = struct {
+ header_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+ body_streaming: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+ aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+
+ pub fn to(this: *Store) Signals {
+ return .{
+ .header_progress = &this.header_progress,
+ .body_streaming = &this.body_streaming,
+ .aborted = &this.aborted,
+ };
+ }
+ };
+
+ pub fn get(this: Signals, comptime field: std.meta.FieldEnum(Signals)) bool {
+ var ptr: *std.atomic.Atomic(bool) = @field(this, @tagName(field)) orelse return false;
+ return ptr.load(.Monotonic);
+ }
+};
+
pub const FetchRedirect = enum(u8) {
follow,
manual,
@@ -761,12 +786,12 @@ pub fn onOpen(
std.debug.assert(is_ssl == client.url.isHTTPS());
}
}
- if (client.aborted != null) {
+ if (client.signals.aborted != null) {
socket_async_http_abort_tracker.put(client.async_http_id, socket.socket) catch unreachable;
}
log("Connected {s} \n", .{client.url.href});
- if (client.hasSignalAborted()) {
+ if (client.signals.get(.aborted)) {
client.closeAndAbort(comptime is_ssl, socket);
return;
}
@@ -1012,6 +1037,7 @@ pub const InternalState = struct {
fail: anyerror = error.NoError,
request_stage: HTTPStage = .pending,
response_stage: HTTPStage = .pending,
+ metadata_sent: bool = false,
pub fn init(body: HTTPRequestBody, body_out_str: *MutableString) InternalState {
return .{
@@ -1153,13 +1179,17 @@ pub const InternalState = struct {
}
pub fn postProcessBody(this: *InternalState) usize {
- var response = &this.pending_response;
- // if it compressed with this header, it is no longer
- if (this.content_encoding_i < response.headers.len) {
- var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len };
- _ = mutable_headers.orderedRemove(this.content_encoding_i);
- response.headers = mutable_headers.items;
- this.content_encoding_i = std.math.maxInt(@TypeOf(this.content_encoding_i));
+
+ // we only touch it if we did not sent the headers yet
+ if (!this.metadata_sent) {
+ var response = &this.pending_response;
+ if (this.content_encoding_i < response.headers.len) {
+ // if it compressed with this header, it is no longer
+ var mutable_headers = std.ArrayListUnmanaged(picohttp.Header){ .items = response.headers, .capacity = response.headers.len };
+ _ = mutable_headers.orderedRemove(this.content_encoding_i);
+ response.headers = mutable_headers.items;
+ this.content_encoding_i = std.math.maxInt(@TypeOf(this.content_encoding_i));
+ }
}
return this.body_out_str.?.list.items.len;
@@ -1201,11 +1231,9 @@ http_proxy: ?URL = null,
proxy_authorization: ?[]u8 = null,
proxy_tunneling: bool = false,
proxy_tunnel: ?ProxyTunnel = null,
-aborted: ?*std.atomic.Atomic(bool) = null,
+signals: Signals = .{},
async_http_id: u32 = 0,
hostname: ?[]u8 = null,
-signal_header_progress: *std.atomic.Atomic(bool),
-enable_body_stream: *std.atomic.Atomic(bool),
pub fn init(
allocator: std.mem.Allocator,
@@ -1213,10 +1241,8 @@ pub fn init(
url: URL,
header_entries: Headers.Entries,
header_buf: string,
- signal: ?*std.atomic.Atomic(bool),
hostname: ?[]u8,
- signal_header_progress: *std.atomic.Atomic(bool),
- enable_body_stream: *std.atomic.Atomic(bool),
+ signals: Signals,
) HTTPClient {
return HTTPClient{
.allocator = allocator,
@@ -1224,10 +1250,8 @@ pub fn init(
.url = url,
.header_entries = header_entries,
.header_buf = header_buf,
- .aborted = signal,
.hostname = hostname,
- .signal_header_progress = signal_header_progress,
- .enable_body_stream = enable_body_stream,
+ .signals = signals,
};
}
@@ -1384,8 +1408,7 @@ pub const AsyncHTTP = struct {
elapsed: u64 = 0,
gzip_elapsed: u64 = 0,
- signal_header_progress: std.atomic.Atomic(bool),
- enable_body_stream: std.atomic.Atomic(bool),
+ signals: Signals = .{},
pub var active_requests_count = std.atomic.Atomic(usize).init(0);
pub var max_simultaneous_requests = std.atomic.Atomic(usize).init(256);
@@ -1418,12 +1441,14 @@ pub const AsyncHTTP = struct {
pub fn signalHeaderProgress(this: *AsyncHTTP) void {
@fence(.Release);
- this.client.signal_header_progress.store(true, .Release);
+ var progress = this.signals.header_progress orelse return;
+ progress.store(true, .Release);
}
pub fn enableBodyStreaming(this: *AsyncHTTP) void {
@fence(.Release);
- this.client.enable_body_stream.store(true, .Release);
+ var stream = this.signals.body_streaming orelse return;
+ stream.store(true, .Release);
}
pub fn clearData(this: *AsyncHTTP) void {
@@ -1453,9 +1478,9 @@ pub const AsyncHTTP = struct {
timeout: usize,
callback: HTTPClientResult.Callback,
http_proxy: ?URL,
- signal: ?*std.atomic.Atomic(bool),
hostname: ?[]u8,
redirect_type: FetchRedirect,
+ signals: ?Signals,
) AsyncHTTP {
var this = AsyncHTTP{
.allocator = allocator,
@@ -1467,12 +1492,11 @@ pub const AsyncHTTP = struct {
.response_buffer = response_buffer,
.result_callback = callback,
.http_proxy = http_proxy,
- .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0,
- .signal_header_progress = std.atomic.Atomic(bool).init(false),
- .enable_body_stream = std.atomic.Atomic(bool).init(false),
+ .signals = signals orelse .{},
+ .async_http_id = if (signals != null and signals.?.aborted != null) async_http_id.fetchAdd(1, .Monotonic) else 0,
};
- this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname, &this.signal_header_progress, &this.enable_body_stream);
+ this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, hostname, signals orelse this.signals);
this.client.async_http_id = this.async_http_id;
this.client.timeout = timeout;
this.client.http_proxy = this.http_proxy;
@@ -1544,7 +1568,21 @@ pub const AsyncHTTP = struct {
}
pub fn initSync(allocator: std.mem.Allocator, method: Method, url: URL, headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, request_body: []const u8, timeout: usize, http_proxy: ?URL, hostname: ?[]u8, redirect_type: FetchRedirect) AsyncHTTP {
- return @This().init(allocator, method, url, headers, headers_buf, response_buffer, request_body, timeout, undefined, http_proxy, null, hostname, redirect_type);
+ return @This().init(
+ allocator,
+ method,
+ url,
+ headers,
+ headers_buf,
+ response_buffer,
+ request_body,
+ timeout,
+ undefined,
+ http_proxy,
+ hostname,
+ redirect_type,
+ null,
+ );
}
fn reset(this: *AsyncHTTP) !void {
@@ -1646,8 +1684,10 @@ pub const AsyncHTTP = struct {
if (!result.isSuccess()) {
return result.fail;
}
-
- return result.response;
+ std.debug.assert(result.metadata != null);
+ if (result.metadata) |metadata| {
+ return metadata.response;
+ }
}
unreachable;
@@ -1659,33 +1699,34 @@ pub const AsyncHTTP = struct {
var callback = this.result_callback;
this.elapsed = http_thread.timer.read() -| this.elapsed;
this.redirected = this.client.remaining_redirect_count != default_redirect_count;
- if (!result.isSuccess()) {
+ if (result.isSuccess()) {
+ this.err = null;
+ if (result.metadata) |metadata| {
+ this.response = metadata.response;
+ }
+ this.state.store(.success, .Monotonic);
+ } else {
this.err = result.fail;
this.response = null;
this.state.store(State.fail, .Monotonic);
- } else {
- this.err = null;
- this.response = result.response;
- this.state.store(.success, .Monotonic);
}
if (result.has_more) {
callback.function(callback.ctx, result);
} else {
- this.client.deinit();
-
- this.real.?.* = this.*;
- this.real.?.response_buffer = this.response_buffer;
-
- log("onAsyncHTTPCallback: {any}", .{bun.fmt.fmtDuration(this.elapsed)});
+ {
+ this.client.deinit();
+ defer default_allocator.destroy(this);
+ this.real.?.* = this.*;
+ this.real.?.response_buffer = this.response_buffer;
- default_allocator.destroy(this);
+ log("onAsyncHTTPCallback: {any}", .{bun.fmt.fmtDuration(this.elapsed)});
+ callback.function(callback.ctx, result);
+ }
const active_requests = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic);
std.debug.assert(active_requests > 0);
- callback.function(callback.ctx, result);
-
if (active_requests >= AsyncHTTP.max_simultaneous_requests.load(.Monotonic)) {
http_thread.drainEvents();
}
@@ -1715,10 +1756,6 @@ pub const AsyncHTTP = struct {
}
};
-pub fn hasSignalAborted(this: *const HTTPClient) bool {
- return (this.aborted orelse return false).load(.Monotonic);
-}
-
pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request {
var header_count: usize = 0;
var header_entries = this.header_entries.slice();
@@ -1842,7 +1879,7 @@ pub fn doRedirect(this: *HTTPClient) void {
tunnel.deinit();
this.proxy_tunnel = null;
}
- if (this.aborted != null) {
+ if (this.signals.aborted != null) {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
return this.start(.{ .bytes = "" }, body_out_str);
@@ -1874,7 +1911,7 @@ pub fn start(this: *HTTPClient, body: HTTPRequestBody, body_out_str: *MutableStr
fn start_(this: *HTTPClient, comptime is_ssl: bool) void {
// Aborted before connecting
- if (this.hasSignalAborted()) {
+ if (this.signals.get(.aborted)) {
this.fail(error.Aborted);
return;
}
@@ -1912,7 +1949,7 @@ fn printResponse(response: picohttp.Response) void {
}
pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
- if (this.hasSignalAborted()) {
+ if (this.signals.get(.aborted)) {
this.closeAndAbort(is_ssl, socket);
return;
}
@@ -2256,7 +2293,7 @@ fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTP
pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void {
log("onData {}", .{incoming_data.len});
- if (this.hasSignalAborted()) {
+ if (this.signals.get(.aborted)) {
this.closeAndAbort(is_ssl, socket);
return;
}
@@ -2359,7 +2396,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
if (body_buf.len == 0) {
// no body data yet, but we can report the headers
- if (this.signal_header_progress.load(.Acquire)) {
+ if (this.signals.get(.header_progress)) {
this.progressUpdate(is_ssl, ctx, socket);
}
return;
@@ -2393,7 +2430,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
}
// if not reported we report partially now
- if (this.signal_header_progress.load(.Acquire)) {
+ if (this.signals.get(.header_progress)) {
this.progressUpdate(is_ssl, ctx, socket);
return;
}
@@ -2512,7 +2549,7 @@ pub fn closeAndAbort(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPCo
}
fn fail(this: *HTTPClient, err: anyerror) void {
- if (this.aborted != null) {
+ if (this.signals.aborted != null) {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
this.state.request_stage = .fail;
@@ -2561,7 +2598,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
if (this.state.stage != .done and this.state.stage != .fail) {
const is_done = this.state.isDone();
- if (this.aborted != null and is_done) {
+ if (this.signals.aborted != null and is_done) {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
@@ -2585,22 +2622,17 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
} else if (!socket.isClosed()) {
socket.close(0, null);
}
+
this.state.reset();
this.state.response_stage = .done;
this.state.request_stage = .done;
this.state.stage = .done;
this.proxy_tunneling = false;
- if (comptime print_every > 0) {
- print_every_i += 1;
- if (print_every_i % print_every == 0) {
- Output.prettyln("Heap stats for HTTP thread\n", .{});
- Output.flush();
- default_arena.dumpThreadStats();
- print_every_i = 0;
- }
- }
}
+
result.body.?.* = body;
+ callback.run(result);
+
if (comptime print_every > 0) {
print_every_i += 1;
if (print_every_i % print_every == 0) {
@@ -2610,25 +2642,39 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
print_every_i = 0;
}
}
- callback.run(result);
}
}
pub const HTTPClientResult = struct {
body: ?*MutableString = null,
- response: picohttp.Response = .{},
- metadata_buf: []u8 = &.{},
- href: []const u8 = "",
- fail: anyerror = error.NoError,
- redirected: bool = false,
- headers_buf: []picohttp.Header = &.{},
has_more: bool = false,
+ fail: anyerror = error.NoError,
+
+ metadata: ?ResultMetadata = null,
/// For Http Client requests
/// when Content-Length is provided this represents the whole size of the request
/// If chunked encoded this will represent the total received size (ignoring the chunk headers)
/// If is not chunked encoded and Content-Length is not provided this will be unknown
body_size: BodySize = .unknown,
+ redirected: bool = false,
+
+ pub const ResultMetadata = struct {
+ response: picohttp.Response = .{},
+ metadata_buf: []u8 = &.{},
+ href: []const u8 = "",
+ headers_buf: []picohttp.Header = &.{},
+
+ pub fn deinit(this: *ResultMetadata) void {
+ if (this.metadata_buf.len > 0) bun.default_allocator.free(this.metadata_buf);
+ if (this.headers_buf.len > 0) bun.default_allocator.free(this.headers_buf);
+ this.headers_buf = &.{};
+ this.metadata_buf = &.{};
+ this.href = "";
+ this.response.headers = &.{};
+ this.response.status = "";
+ }
+ };
pub const BodySize = union(enum) {
total_received: usize,
@@ -2648,17 +2694,6 @@ pub const HTTPClientResult = struct {
return this.fail == error.Aborted;
}
- pub fn deinitMetadata(this: *HTTPClientResult) void {
- if (this.metadata_buf.len > 0) bun.default_allocator.free(this.metadata_buf);
- if (this.headers_buf.len > 0) bun.default_allocator.free(this.headers_buf);
-
- this.headers_buf = &.{};
- this.metadata_buf = &.{};
- this.href = "";
- this.response.headers = &.{};
- this.response.status = "";
- }
-
pub const Callback = struct {
ctx: *anyopaque,
function: Function,
@@ -2694,14 +2729,26 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientRes
.{ .content_length = content_length }
else
.{ .unknown = {} };
+ if (!this.state.metadata_sent) {
+ this.state.metadata_sent = true;
+ return HTTPClientResult{
+ .metadata = .{
+ .response = metadata.response,
+ .metadata_buf = metadata.owned_buf,
+ .href = metadata.url,
+ .headers_buf = metadata.response.headers,
+ },
+ .body = this.state.body_out_str,
+ .redirected = this.remaining_redirect_count != default_redirect_count,
+ .fail = this.state.fail,
+ .has_more = this.state.fail == error.NoError and !this.state.isDone(),
+ .body_size = body_size,
+ };
+ }
return HTTPClientResult{
.body = this.state.body_out_str,
- .response = metadata.response,
- .metadata_buf = metadata.owned_buf,
- .redirected = this.remaining_redirect_count != default_redirect_count,
- .href = metadata.url,
+ .metadata = null,
.fail = this.state.fail,
- .headers_buf = metadata.response.headers,
.has_more = this.state.fail == error.NoError and !this.state.isDone(),
.body_size = body_size,
};
@@ -2786,7 +2833,7 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con
// done or streaming
const is_done = this.state.total_body_received >= content_length;
- if (is_done or this.enable_body_stream.load(.Acquire)) {
+ if (is_done or this.signals.get(.body_streaming)) {
const processed = try this.state.processBodyBuffer(buffer.*);
if (this.progress_node) |progress| {
@@ -2848,7 +2895,7 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets(
progress.context.maybeRefresh();
}
// streaming chunks
- if (this.enable_body_stream.load(.Acquire)) {
+ if (this.signals.get(.body_streaming)) {
const processed = try this.state.processBodyBuffer(buffer);
return processed > 0;
}
@@ -2927,7 +2974,7 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket(
try body_buffer.appendSliceExact(buffer);
// streaming chunks
- if (this.enable_body_stream.load(.Acquire)) {
+ if (this.signals.get(.body_streaming)) {
const processed = try this.state.processBodyBuffer(body_buffer.*);
return processed > 0;
}
diff --git a/src/install/install.zig b/src/install/install.zig
index d444b62fc..b0cdf35c8 100644
--- a/src/install/install.zig
+++ b/src/install/install.zig
@@ -370,8 +370,8 @@ const NetworkTask = struct {
this.getCompletionCallback(),
this.package_manager.httpProxy(url),
null,
- null,
HTTP.FetchRedirect.follow,
+ null,
);
this.callback = .{
.package_manifest = .{
@@ -448,8 +448,8 @@ const NetworkTask = struct {
this.getCompletionCallback(),
this.package_manager.httpProxy(url),
null,
- null,
HTTP.FetchRedirect.follow,
+ null,
);
this.callback = .{ .extract = tarball };
}
diff --git a/src/js/builtins/ReadableStreamDefaultReader.ts b/src/js/builtins/ReadableStreamDefaultReader.ts
index ea1a64b68..169806c52 100644
--- a/src/js/builtins/ReadableStreamDefaultReader.ts
+++ b/src/js/builtins/ReadableStreamDefaultReader.ts
@@ -104,9 +104,11 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
if ($getByIdDirectPrivate(controller, "closeRequested"))
$readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
- else if ($isReadableStreamDefaultController(controller))
+ else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
- else if ($isReadableByteStreamController(controller)) $readableByteStreamControllerCallPullIfNeeded(controller);
+ } else if ($isReadableByteStreamController(controller)) {
+ $readableByteStreamControllerCallPullIfNeeded(controller);
+ }
return { value: outValues, size, done: false };
}
@@ -138,11 +140,13 @@ export function readMany(this: ReadableStreamDefaultReader): ReadableStreamDefau
var size = queue.size;
$resetQueue(queue);
- if ($getByIdDirectPrivate(controller, "closeRequested"))
+ if ($getByIdDirectPrivate(controller, "closeRequested")) {
$readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
- else if ($isReadableStreamDefaultController(controller))
+ } else if ($isReadableStreamDefaultController(controller)) {
$readableStreamDefaultControllerCallPullIfNeeded(controller);
- else if ($isReadableByteStreamController(controller)) $readableByteStreamControllerCallPullIfNeeded(controller);
+ } else if ($isReadableByteStreamController(controller)) {
+ $readableByteStreamControllerCallPullIfNeeded(controller);
+ }
return { value: value, size: size, done: false };
};
diff --git a/src/js/node/async_hooks.ts b/src/js/node/async_hooks.ts
index 2a671b6a2..d04b226f8 100644
--- a/src/js/node/async_hooks.ts
+++ b/src/js/node/async_hooks.ts
@@ -21,7 +21,7 @@
// AsyncContextData is an immutable array managed in here, formatted [key, value, key, value] where
// each key is an AsyncLocalStorage object and the value is the associated value.
//
-const { cleanupLater } = $lazy("async_hooks");
+const { cleanupLater, setAsyncHooksEnabled } = $lazy("async_hooks");
function get(): ReadonlyArray<any> | undefined {
return $getInternalField($asyncContext, 0);
@@ -34,7 +34,9 @@ function set(contextValue: ReadonlyArray<any> | undefined) {
class AsyncLocalStorage {
#disableCalled = false;
- constructor() {}
+ constructor() {
+ setAsyncHooksEnabled(true);
+ }
static bind(fn, ...args: any) {
return this.snapshot().bind(null, fn, ...args);
@@ -160,6 +162,7 @@ class AsyncResource {
if (typeof type !== "string") {
throw new TypeError('The "type" argument must be of type string. Received type ' + typeof type);
}
+ setAsyncHooksEnabled(true);
this.type = type;
this.#snapshot = get();
}
diff --git a/src/js/out/InternalModuleRegistryConstants.h b/src/js/out/InternalModuleRegistryConstants.h
index 20a7693b1..dea913eb4 100644
--- a/src/js/out/InternalModuleRegistryConstants.h
+++ b/src/js/out/InternalModuleRegistryConstants.h
@@ -30,7 +30,7 @@ static constexpr ASCIILiteral NodeAssertStrictCode = "(function (){\"use strict\
//
//
-static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s;
+static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater, setAsyncHooksEnabled } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n setAsyncHooksEnabled(!0);\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n setAsyncHooksEnabled(!0), this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s;
//
//
@@ -263,7 +263,7 @@ static constexpr ASCIILiteral NodeAssertStrictCode = "(function (){\"use strict\
//
//
-static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s;
+static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater, setAsyncHooksEnabled } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n setAsyncHooksEnabled(!0);\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n setAsyncHooksEnabled(!0), this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s;
//
//
@@ -497,7 +497,7 @@ static constexpr ASCIILiteral NodeAssertStrictCode = "(function (){\"use strict\
//
//
-static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s;
+static constexpr ASCIILiteral NodeAsyncHooksCode = "(function (){\"use strict\";// src/js/out/tmp/node/async_hooks.ts\nvar get = function() {\n return @getInternalField(@asyncContext, 0);\n}, set = function(contextValue) {\n return @putInternalField(@asyncContext, 0, contextValue);\n}, createWarning = function(message) {\n let warned = !1;\n var wrapped = function() {\n if (warned)\n return;\n if (new Error().stack.includes(\"zx/build/core.js\"))\n return;\n warned = !0, console.warn(\"[bun] Warning:\", message);\n };\n return wrapped;\n}, createHook = function(callbacks) {\n return {\n enable: createHookNotImpl,\n disable: createHookNotImpl\n };\n}, executionAsyncId = function() {\n return executionAsyncIdNotImpl(), 0;\n}, triggerAsyncId = function() {\n return 0;\n}, executionAsyncResource = function() {\n return executionAsyncResourceWarning(), process.stdin;\n}, $, { cleanupLater, setAsyncHooksEnabled } = globalThis[globalThis.Symbol.for('Bun.lazy')](\"async_hooks\");\n\nclass AsyncLocalStorage {\n #disableCalled = !1;\n constructor() {\n setAsyncHooksEnabled(!0);\n }\n static bind(fn, ...args) {\n return this.snapshot().bind(null, fn, ...args);\n }\n static snapshot() {\n var context = get();\n return (fn, ...args) => {\n var prev = get();\n set(context);\n try {\n return fn(...args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n };\n }\n enterWith(store) {\n cleanupLater();\n var context = get();\n if (!context) {\n set([this, store]);\n return;\n }\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n const clone = context.slice();\n clone[i + 1] = store, set(clone);\n return;\n }\n set(context.concat(this, store));\n }\n exit(cb, ...args) {\n return this.run(void 0, cb, ...args);\n }\n run(store, callback, ...args) {\n var context = get(), hasPrevious = !1, previous, i = 0, contextWasInit = !context;\n if (contextWasInit)\n set(context = [this, store]);\n else {\n if (context = context.slice(), i = context.indexOf(this), i > -1)\n hasPrevious = !0, previous = context[i + 1], context[i + 1] = store;\n else\n context.push(this, store);\n set(context);\n }\n try {\n return callback(...args);\n } catch (e) {\n throw e;\n } finally {\n if (!this.#disableCalled) {\n var context2 = get();\n if (context2 === context && contextWasInit)\n set(void 0);\n else if (context2 = context2.slice(), hasPrevious)\n context2[i + 1] = previous, set(context2);\n else\n context2.splice(i, 2), set(context2.length \? context2 : void 0);\n }\n }\n }\n disable() {\n if (!this.#disableCalled) {\n var context = get();\n if (context) {\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this) {\n context.splice(i, 2), set(context.length \? context : void 0);\n break;\n }\n }\n this.#disableCalled = !0;\n }\n }\n getStore() {\n var context = get();\n if (!context)\n return;\n var { length } = context;\n for (var i = 0;i < length; i += 2)\n if (context[i] === this)\n return context[i + 1];\n }\n}\n\nclass AsyncResource {\n type;\n #snapshot;\n constructor(type, options) {\n if (typeof type !== \"string\")\n @throwTypeError('The \"type\" argument must be of type string. Received type ' + typeof type);\n setAsyncHooksEnabled(!0), this.type = type, this.#snapshot = get();\n }\n emitBefore() {\n return !0;\n }\n emitAfter() {\n return !0;\n }\n asyncId() {\n return 0;\n }\n triggerAsyncId() {\n return 0;\n }\n emitDestroy() {\n }\n runInAsyncScope(fn, thisArg, ...args) {\n var prev = get();\n set(this.#snapshot);\n try {\n return fn.apply(thisArg, args);\n } catch (error) {\n throw error;\n } finally {\n set(prev);\n }\n }\n}\nvar createHookNotImpl = createWarning(\"async_hooks.createHook is not implemented in Bun. Hooks can still be created but will never be called.\"), executionAsyncIdNotImpl = createWarning(\"async_hooks.executionAsyncId/triggerAsyncId are not implemented in Bun. It will return 0 every time.\"), executionAsyncResourceWarning = createWarning(\"async_hooks.executionAsyncResource is not implemented in Bun. It returns a reference to process.stdin every time.\"), asyncWrapProviders = {\n NONE: 0,\n DIRHANDLE: 1,\n DNSCHANNEL: 2,\n ELDHISTOGRAM: 3,\n FILEHANDLE: 4,\n FILEHANDLECLOSEREQ: 5,\n FIXEDSIZEBLOBCOPY: 6,\n FSEVENTWRAP: 7,\n FSREQCALLBACK: 8,\n FSREQPROMISE: 9,\n GETADDRINFOREQWRAP: 10,\n GETNAMEINFOREQWRAP: 11,\n HEAPSNAPSHOT: 12,\n HTTP2SESSION: 13,\n HTTP2STREAM: 14,\n HTTP2PING: 15,\n HTTP2SETTINGS: 16,\n HTTPINCOMINGMESSAGE: 17,\n HTTPCLIENTREQUEST: 18,\n JSSTREAM: 19,\n JSUDPWRAP: 20,\n MESSAGEPORT: 21,\n PIPECONNECTWRAP: 22,\n PIPESERVERWRAP: 23,\n PIPEWRAP: 24,\n PROCESSWRAP: 25,\n PROMISE: 26,\n QUERYWRAP: 27,\n SHUTDOWNWRAP: 28,\n SIGNALWRAP: 29,\n STATWATCHER: 30,\n STREAMPIPE: 31,\n TCPCONNECTWRAP: 32,\n TCPSERVERWRAP: 33,\n TCPWRAP: 34,\n TTYWRAP: 35,\n UDPSENDWRAP: 36,\n UDPWRAP: 37,\n SIGINTWATCHDOG: 38,\n WORKER: 39,\n WORKERHEAPSNAPSHOT: 40,\n WRITEWRAP: 41,\n ZLIB: 42,\n CHECKPRIMEREQUEST: 43,\n PBKDF2REQUEST: 44,\n KEYPAIRGENREQUEST: 45,\n KEYGENREQUEST: 46,\n KEYEXPORTREQUEST: 47,\n CIPHERREQUEST: 48,\n DERIVEBITSREQUEST: 49,\n HASHREQUEST: 50,\n RANDOMBYTESREQUEST: 51,\n RANDOMPRIMEREQUEST: 52,\n SCRYPTREQUEST: 53,\n SIGNREQUEST: 54,\n TLSWRAP: 55,\n VERIFYREQUEST: 56,\n INSPECTORJSBINDING: 57\n};\n$ = {\n AsyncLocalStorage,\n createHook,\n executionAsyncId,\n triggerAsyncId,\n executionAsyncResource,\n asyncWrapProviders,\n AsyncResource\n};\nreturn $})\n"_s;
//
//
diff --git a/test/bun.lockb b/test/bun.lockb
index c94d5aff3..f212bb6c5 100755
--- a/test/bun.lockb
+++ b/test/bun.lockb
Binary files differ
diff --git a/test/js/bun/http/fetch-file-upload.test.ts b/test/js/bun/http/fetch-file-upload.test.ts
index b070fbd6e..197470b9d 100644
--- a/test/js/bun/http/fetch-file-upload.test.ts
+++ b/test/js/bun/http/fetch-file-upload.test.ts
@@ -34,6 +34,104 @@ test("uploads roundtrip", async () => {
server.stop(true);
});
+// https://github.com/oven-sh/bun/issues/3969
+test("formData uploads roundtrip, with a call to .body", async () => {
+ const file = Bun.file(import.meta.dir + "/fetch.js.txt");
+ const body = new FormData();
+ body.append("file", file, "fetch.js.txt");
+
+ const server = Bun.serve({
+ port: 0,
+ development: false,
+ async fetch(req) {
+ req.body;
+
+ return new Response(await req.formData());
+ },
+ });
+
+ // @ts-ignore
+ const reqBody = new Request(`http://${server.hostname}:${server.port}`, {
+ body,
+ method: "POST",
+ });
+ const res = await fetch(reqBody);
+ expect(res.status).toBe(200);
+
+ // but it does for Response
+ expect(res.headers.get("Content-Type")).toStartWith("multipart/form-data; boundary=");
+ res.body;
+ const resData = await res.formData();
+ expect(await (resData.get("file") as Blob).arrayBuffer()).toEqual(await file.arrayBuffer());
+
+ server.stop(true);
+});
+
+test("req.formData throws error when stream is in use", async () => {
+ const file = Bun.file(import.meta.dir + "/fetch.js.txt");
+ const body = new FormData();
+ body.append("file", file, "fetch.js.txt");
+ var pass = false;
+ const server = Bun.serve({
+ port: 0,
+ development: false,
+ error(fail) {
+ pass = true;
+ if (fail.toString().includes("is already used")) {
+ return new Response("pass");
+ }
+ return new Response("fail");
+ },
+ async fetch(req) {
+ var reader = req.body?.getReader();
+ await reader?.read();
+ await req.formData();
+ throw new Error("should not reach here");
+ },
+ });
+
+ // @ts-ignore
+ const reqBody = new Request(`http://${server.hostname}:${server.port}`, {
+ body,
+ method: "POST",
+ });
+ const res = await fetch(reqBody);
+ expect(res.status).toBe(200);
+
+ // but it does for Response
+ expect(await res.text()).toBe("pass");
+ server.stop(true);
+});
+
+test("formData uploads roundtrip, without a call to .body", async () => {
+ const file = Bun.file(import.meta.dir + "/fetch.js.txt");
+ const body = new FormData();
+ body.append("file", file, "fetch.js.txt");
+
+ const server = Bun.serve({
+ port: 0,
+ development: false,
+ async fetch(req) {
+ return new Response(await req.formData());
+ },
+ });
+
+ // @ts-ignore
+ const reqBody = new Request(`http://${server.hostname}:${server.port}`, {
+ body,
+ method: "POST",
+ });
+ const res = await fetch(reqBody);
+ expect(res.status).toBe(200);
+
+ // but it does for Response
+ expect(res.headers.get("Content-Type")).toStartWith("multipart/form-data; boundary=");
+ const resData = await res.formData();
+ expect(await (resData.get("file") as Blob).arrayBuffer()).toEqual(await file.arrayBuffer());
+
+ server.stop(true);
+});
+
test("uploads roundtrip with sendfile()", async () => {
var hugeTxt = "huge".repeat(1024 * 1024 * 32);
const path = join(tmpdir(), "huge.txt");
diff --git a/test/js/bun/http/serve.test.ts b/test/js/bun/http/serve.test.ts
index bba35c085..4c55e779a 100644
--- a/test/js/bun/http/serve.test.ts
+++ b/test/js/bun/http/serve.test.ts
@@ -213,63 +213,97 @@ it("request.url should be based on the Host header", async () => {
describe("streaming", () => {
describe("error handler", () => {
- it("throw on pull reports an error and close the connection", async () => {
- var pass = false;
+ it("throw on pull renders headers, does not call error handler", async () => {
+ var pass = true;
await runTest(
{
error(e) {
- pass = true;
- return new Response("PASS", { status: 555 });
+ pass = false;
+ return new Response("FAIL!", { status: 555 });
},
fetch(req) {
return new Response(
new ReadableStream({
pull(controller) {
- throw new Error("FAIL");
+ throw new Error("Not a real error");
},
+ cancel(reason) {},
}),
+ {
+ status: 402,
+ headers: {
+ "I-AM": "A-TEAPOT",
+ },
+ },
);
},
},
async server => {
const response = await fetch(`http://${server.hostname}:${server.port}`);
- if (response.status > 0) {
- expect(response.status).toBe(555);
- expect(await response.text()).toBe("PASS");
- }
+ expect(response.status).toBe(402);
+ expect(response.headers.get("I-AM")).toBe("A-TEAPOT");
+ expect(await response.text()).toBe("");
expect(pass).toBe(true);
},
);
});
- it("throw on pull after writing should not call the error handler", async () => {
- var pass = true;
- await runTest(
- {
- error(e) {
- pass = false;
- return new Response("FAIL", { status: 555 });
- },
- fetch(req) {
- return new Response(
- new ReadableStream({
+ describe("throw on pull after writing should not call the error handler", () => {
+ async function execute(options: ResponseInit) {
+ var pass = true;
+ await runTest(
+ {
+ error(e) {
+ pass = false;
+ return new Response("FAIL", { status: 555 });
+ },
+ fetch(req) {
+ const stream = new ReadableStream({
async pull(controller) {
controller.enqueue("PASS");
controller.close();
- throw new Error("error");
+ throw new Error("FAIL");
},
- }),
- );
+ });
+ return new Response(stream, options);
+ },
},
- },
- async server => {
- const response = await fetch(`http://${server.hostname}:${server.port}`);
- // connection terminated
- expect(response.status).toBe(200);
- expect(await response.text()).toBe("PASS");
- expect(pass).toBe(true);
- },
- );
+ async server => {
+ const response = await fetch(`http://${server.hostname}:${server.port}`);
+ // connection terminated
+ expect(await response.text()).toBe("");
+ expect(response.status).toBe(options.status ?? 200);
+ expect(pass).toBe(true);
+ },
+ );
+ }
+
+ it("with headers", async () => {
+ await execute({
+ headers: {
+ "X-A": "123",
+ },
+ });
+ });
+
+ it("with headers and status", async () => {
+ await execute({
+ status: 204,
+ headers: {
+ "X-A": "123",
+ },
+ });
+ });
+
+ it("with status", async () => {
+ await execute({
+ status: 204,
+ });
+ });
+
+ it("with empty object", async () => {
+ await execute({});
+ });
});
});
@@ -1004,19 +1038,26 @@ it("request body and signal life cycle", async () => {
};
const server = Bun.serve({
+ port: 0,
async fetch(req) {
- await queueMicrotask(() => Bun.gc(true));
return new Response(await renderToReadableStream(app_jsx), headers);
},
});
try {
const requests = [];
- for (let i = 0; i < 1000; i++) {
- requests.push(fetch(`http://${server.hostname}:${server.port}`));
+ for (let j = 0; j < 10; j++) {
+ for (let i = 0; i < 250; i++) {
+ requests.push(fetch(`http://${server.hostname}:${server.port}`));
+ }
+
+ await Promise.all(requests);
+ requests.length = 0;
+ Bun.gc(true);
}
- await Promise.all(requests);
- } catch {}
+ } catch (e) {
+ console.error(e);
+ }
await Bun.sleep(10);
expect(true).toBe(true);
server.stop(true);
diff --git a/test/js/bun/stream/direct-readable-stream.test.tsx b/test/js/bun/stream/direct-readable-stream.test.tsx
index c06840947..1f090671b 100644
--- a/test/js/bun/stream/direct-readable-stream.test.tsx
+++ b/test/js/bun/stream/direct-readable-stream.test.tsx
@@ -229,12 +229,17 @@ describe("ReactDOM", () => {
server = serve({
port: 0,
async fetch(req) {
- return new Response(await renderToReadableStream(reactElement));
+ return new Response(await renderToReadableStream(reactElement), {
+ headers: {
+ "X-React": "1",
+ },
+ });
},
});
const response = await fetch("http://localhost:" + server.port + "/");
const result = await response.text();
expect(result.replaceAll("<!-- -->", "")).toBe(inputString);
+ expect(response.headers.get("X-React")).toBe("1");
} finally {
server?.stop(true);
}
diff --git a/test/js/third_party/napi_create_external/napi-create-external.test.ts b/test/js/third_party/napi_create_external/napi-create-external.test.ts
deleted file mode 100644
index 47025e100..000000000
--- a/test/js/third_party/napi_create_external/napi-create-external.test.ts
+++ /dev/null
@@ -1,195 +0,0 @@
-// @ts-nocheck
-import { test, it, describe, expect } from "bun:test";
-import { withoutAggressiveGC } from "harness";
-import * as _ from "lodash";
-
-function rebase(str, inBase, outBase) {
- const mapBase = (b: number) => (b === 2 ? 32 : b === 16 ? 8 : null);
- const stride = mapBase(inBase);
- const pad = mapBase(outBase);
- if (!stride) throw new Error(`Bad inBase ${inBase}`);
- if (!pad) throw new Error(`Bad outBase ${outBase}`);
- if (str.length % stride) throw new Error(`Bad string length ${str.length}`);
- const out = [];
- for (let i = 0; i < str.length; i += stride)
- out.push(
- parseInt(str.slice(i, i + stride), inBase)
- .toString(outBase)
- .padStart(pad, "0"),
- );
- return out.join("");
-}
-
-function expectDeepEqual(a, b) {
- expect(a).toEqual(b);
-}
-class HashMaker {
- constructor(length) {
- this.length = length;
- this._dist = {};
- }
- length: number;
- _dist: any;
-
- binToHex(binHash) {
- if (binHash.length !== this.length) throw new Error(`Hash length mismatch ${this.length} != ${binHash.length}`);
- return rebase(binHash, 2, 16);
- }
-
- makeBits() {
- const bits = [];
- for (let i = 0; i < this.length; i++) bits.push(i);
- return _.shuffle(bits);
- }
-
- makeRandom() {
- const bits = [];
- for (let i = 0; i < this.length; i++) bits.push(Math.random() < 0.5 ? 1 : 0);
- return bits;
- }
-
- get keySet() {
- return (this._set = this._set || new Set(this.data));
- }
-
- randomKey() {
- while (true) {
- const hash = this.binToHex(this.makeRandom().join(""));
- if (!this.keySet.has(hash)) return hash;
- }
- }
-
- get data() {
- return (this._data =
- this._data ||
- (() => {
- const bits = this.makeBits();
- const base = this.makeRandom();
- const data = [];
- for (let stride = 0; bits.length; stride++) {
- const flip = bits.splice(0, stride);
- for (const bit of flip) base[bit] = 1 - base[bit];
- data.push(this.binToHex(base.join("")));
- }
- return data;
- })());
- }
-
- get random() {
- const d = this.data;
- return d[Math.floor(Math.random() * d.length)];
- }
-
- distance(a, b) {
- const bitCount = n => {
- n = n - ((n >> 1) & 0x55555555);
- n = (n & 0x33333333) + ((n >> 2) & 0x33333333);
- return (((n + (n >> 4)) & 0xf0f0f0f) * 0x1010101) >> 24;
- };
-
- if (a === b) return 0;
- if (a > b) return this.distance(b, a);
- const hash = a + "-" + b;
- return (this._dist[hash] =
- this._dist[hash] ||
- (() => {
- let dist = 0;
- for (let i = 0; i < a.length; i += 8) {
- const va = parseInt(a.slice(i, i + 8), 16);
- const vb = parseInt(b.slice(i, i + 8), 16);
- dist += bitCount(va ^ vb);
- }
- return dist;
- })());
- }
-
- query(baseKey, maxDist) {
- const out = [];
- for (const key of this.data) {
- const distance = this.distance(key, baseKey);
- if (distance <= maxDist) out.push({ key, distance });
- }
- return out.sort((a, b) => a.distance - b.distance);
- }
-}
-
-const treeClass = require("bktree-fast/native");
-
-withoutAggressiveGC(() => {
- // this test is too slow
- for (let keyLen = 64; keyLen <= 64; keyLen += 64) {
- // for (let keyLen = 64; keyLen <= 512; keyLen += 64) {
- const hm = new HashMaker(keyLen);
- describe(`Key length: ${keyLen}`, () => {
- it("should compute distance", () => {
- const tree = new treeClass(keyLen);
- for (const a of hm.data) for (const b of hm.data) expect(tree.distance(a, b)).toBe(hm.distance(a, b));
- });
-
- it("should know which keys it has", () => {
- const tree = new treeClass(keyLen).add(hm.data);
- expectDeepEqual(
- hm.data.map(hash => tree.has(hash)),
- hm.data.map(() => true),
- );
- // Not interested in the hash
- for (const hash of hm.data) expect(tree.has(hm.randomKey())).toBe(false);
- });
-
- it("should know the tree size", () => {
- const tree = new treeClass(keyLen, { foo: 1 });
- expect(tree.size).toBe(0);
- tree.add(hm.data);
- expect(tree.size).toBe(hm.data.length);
- tree.add(hm.data);
- expect(tree.size).toBe(hm.data.length);
- });
-
- it("should walk the tree", () => {
- const tree = new treeClass(keyLen).add(hm.data);
- const got = [];
- tree.walk((hash, depth) => got.push(hash));
- expectDeepEqual(got.sort(), hm.data.slice(0).sort());
- });
-
- it("should query", () => {
- ((treeClass, expectDeepEqual) => {
- const tree = new treeClass(keyLen).add(hm.data);
-
- for (let dist = 0; dist <= hm.length; dist++) {
- for (const baseKey of [hm.random, hm.data[0]]) {
- const baseKey = hm.random;
- const got = [];
- tree.query(baseKey, dist, (key, distance) => got.push({ key, distance }));
- const want = hm.query(baseKey, dist);
- expectDeepEqual(
- got.sort((a, b) => a.distance - b.distance),
- want,
- );
- expectDeepEqual(tree.find(baseKey, dist), want);
- }
- }
- })(treeClass, expectDeepEqual);
- });
- });
- }
-
- describe("Misc functions", () => {
- it("should pad keys", () => {
- const tree = new treeClass(64);
- expect(tree.padKey("1")).toBe("0000000000000001");
- tree.add(["1", "2", "3"]);
-
- const got = [];
- tree.query("2", 3, (hash, distance) => got.push({ hash, distance }));
- const res = got.sort((a, b) => a.distance - b.distance);
- const want = [
- { hash: "0000000000000002", distance: 0 },
- { hash: "0000000000000003", distance: 1 },
- { hash: "0000000000000001", distance: 2 },
- ];
-
- expectDeepEqual(res, want);
- });
- });
-});
diff --git a/test/js/web/fetch/fetch.stream.test.ts b/test/js/web/fetch/fetch.stream.test.ts
index efef6a161..dc082fd2f 100644
--- a/test/js/web/fetch/fetch.stream.test.ts
+++ b/test/js/web/fetch/fetch.stream.test.ts
@@ -412,8 +412,7 @@ describe("fetch() with streaming", () => {
const fixture = matrix[i];
for (let j = 0; j < matrix.length; j++) {
const fixtureb = matrix[j];
- const test = fixture.name == "empty" && fixtureb.name == "empty" ? it.todo : it;
- test(`can handle fixture ${fixture.name} x ${fixtureb.name}`, async () => {
+ it(`can handle fixture ${fixture.name} x ${fixtureb.name}`, async () => {
let server: Server | null = null;
try {
//@ts-ignore
@@ -917,12 +916,11 @@ describe("fetch() with streaming", () => {
drain(socket) {},
},
});
-
- const res = await fetch(`http://${server.hostname}:${server.port}`, {
- signal: AbortSignal.timeout(1000),
- });
- gcTick(false);
try {
+ const res = await fetch(`http://${server.hostname}:${server.port}`, {
+ signal: AbortSignal.timeout(1000),
+ });
+ gcTick(false);
const reader = res.body?.getReader();
let buffer = Buffer.alloc(0);
@@ -995,6 +993,7 @@ describe("fetch() with streaming", () => {
compressed[0] = 0; // corrupt data
cork = false;
for (var i = 0; i < 5; i++) {
+ compressed[size * i] = 0; // corrupt data even more
await write(compressed.slice(size * i, size * (i + 1)));
}
socket.flush();
@@ -1003,10 +1002,10 @@ describe("fetch() with streaming", () => {
},
});
- const res = await fetch(`http://${server.hostname}:${server.port}`, {});
- gcTick(false);
-
try {
+ const res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+
const reader = res.body?.getReader();
let buffer = Buffer.alloc(0);
@@ -1079,23 +1078,27 @@ describe("fetch() with streaming", () => {
// 10 extra missing bytes that we will never sent in this case we will wait to close
await write("Content-Length: " + compressed.byteLength + 10 + "\r\n");
await write("\r\n");
+
+ resolveSocket(socket);
+
const size = compressed.byteLength / 5;
for (var i = 0; i < 5; i++) {
cork = false;
await write(compressed.slice(size * i, size * (i + 1)));
}
socket.flush();
- resolveSocket(socket);
},
drain(socket) {},
},
});
- const res = await fetch(`http://${server.hostname}:${server.port}`, {});
- gcTick(false);
+ let socket: Socket | null = null;
- let socket: Socket | null = await promise;
try {
+ const res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ socket = await promise;
+ gcTick(false);
+
const reader = res.body?.getReader();
let buffer = Buffer.alloc(0);
diff --git a/test/package.json b/test/package.json
index 099f3d312..4d7cd6606 100644
--- a/test/package.json
+++ b/test/package.json
@@ -10,7 +10,6 @@
"dependencies": {
"@prisma/client": "5.1.1",
"@swc/core": "1.3.38",
- "bktree-fast": "0.0.7",
"body-parser": "1.20.2",
"comlink": "4.4.1",
"dedent": "0.7.0",
@@ -39,7 +38,6 @@
},
"private": true,
"scripts": {
- "typecheck": "tsc --noEmit",
- "postinstall": "cd node_modules/bktree-fast && npx node-gyp configure build"
+ "typecheck": "tsc --noEmit"
}
}