aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Ciro Spaciari <ciro.spaciari@gmail.com> 2023-08-22 00:30:34 -0300
committerGravatar GitHub <noreply@github.com> 2023-08-21 20:30:34 -0700
commit9027484ae1a9eaf9769d79828db48de66450a3fc (patch)
treee6b679132046bbc0ef254a170c0b0ec3030d3fa2
parent91eacade975a522f7ad3d62185ff8adace3aad97 (diff)
downloadbun-9027484ae1a9eaf9769d79828db48de66450a3fc.tar.gz
bun-9027484ae1a9eaf9769d79828db48de66450a3fc.tar.zst
bun-9027484ae1a9eaf9769d79828db48de66450a3fc.zip
fetch(stream) add stream support for compressed and uncompressed data (#4127)
* streams non compressed data in 64kb chunks (at least) * fmt * wip remove pause * fix default streaming and buffering * fix atomic lags * fix size * make chunked encoding work again (WIP streaming chunked) * WIP: chunked encoding streaming * fix end of streamings * working streaming + compression * add fixes + tests * fmt + fix proxy * fix oopsies * codegen after merge * fmt + fixes * more fixes * more fixes and logs * avoid double free * check empty before pop * check empty on pop * fix copy to real when complete * remove unnecessary logs * better has_schedule_callback swap, body locked size helper, remove isEmpty from unbounded_queue pop * fix response ref, fix body_size * add deflate support, fix error throw, add more tests * codegen after merge * remove logs, add connection close test * fix macOS build * fix redirect error option * make body_size more clear * support new Reponse(response) * toString DOMWrapper objects properly instead of supporting response in Response constructor * ignore headers with no name, add more tests * oops * handle transform with fetch * add gz image stream test * remove duplicate test * fix missing chunk on macOS under pressure * oops include all OS * some fixes * compare buffers instead of sizes * refactor err.err and protect it
-rw-r--r--misctools/fetch.zig2
-rw-r--r--src/bun.js/api/bun/subprocess.zig6
-rw-r--r--src/bun.js/bindings/bindings.cpp2
-rw-r--r--src/bun.js/event_loop.zig3
-rw-r--r--src/bun.js/webcore/blob.zig20
-rw-r--r--src/bun.js/webcore/body.zig18
-rw-r--r--src/bun.js/webcore/response.zig260
-rw-r--r--src/bun.js/webcore/streams.zig89
-rw-r--r--src/cli/create_command.zig56
-rw-r--r--src/cli/upgrade_command.zig28
-rw-r--r--src/deps/uws.zig16
-rw-r--r--src/http/zlib.zig1
-rw-r--r--src/http_client_async.zig373
-rw-r--r--src/zlib.zig8
-rw-r--r--test/js/web/fetch/fetch-gzip.test.ts4
-rw-r--r--test/js/web/fetch/fetch.stream.test.ts1129
-rw-r--r--test/js/web/fetch/fetch.test.ts17
-rw-r--r--test/js/web/fetch/fixture.pngbin0 -> 82022 bytes
-rw-r--r--test/js/web/fetch/fixture.png.gzbin0 -> 78116 bytes
19 files changed, 1876 insertions, 156 deletions
diff --git a/misctools/fetch.zig b/misctools/fetch.zig
index e450ab5d3..bb9e09a2a 100644
--- a/misctools/fetch.zig
+++ b/misctools/fetch.zig
@@ -187,7 +187,7 @@ pub fn main() anyerror!void {
var ctx = try default_allocator.create(HTTP.HTTPChannelContext);
ctx.* = .{
.channel = channel,
- .http = try HTTP.AsyncHTTP.init(default_allocator, args.method, args.url, args.headers, args.headers_buf, response_body_string, args.body, 0, HTTP.FetchRedirect.follow),
+ .http = try HTTP.AsyncHTTP.init(default_allocator, args.method, args.url, args.headers, args.headers_buf, response_body_string, args.body, 0, HTTP.FetchRedirect.follow,),
};
ctx.http.callback = HTTP.HTTPChannelContext.callback;
var batch = HTTPThread.Batch{};
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index 4fb02b8af..b8bf6939c 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -616,7 +616,11 @@ pub const Subprocess = struct {
return;
},
.err => |err| {
- this.status = .{ .err = err };
+ if (err == .Error) {
+ this.status = .{ .err = err.Error };
+ } else {
+ this.status = .{ .err = JSC.Node.Syscall.Error.fromCode(.CANCELED, .read) };
+ }
this.fifo.close();
return;
diff --git a/src/bun.js/bindings/bindings.cpp b/src/bun.js/bindings/bindings.cpp
index 37594288d..badfd3437 100644
--- a/src/bun.js/bindings/bindings.cpp
+++ b/src/bun.js/bindings/bindings.cpp
@@ -1181,7 +1181,7 @@ WebCore::FetchHeaders* WebCore__FetchHeaders__createFromPicoHeaders_(const void*
for (size_t j = 0; j < end; j++) {
PicoHTTPHeader header = pico_headers.ptr[j];
- if (header.value.len == 0)
+ if (header.value.len == 0 || header.name.len == 0)
continue;
StringView nameView = StringView(reinterpret_cast<const char*>(header.name.ptr), header.name.len);
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 92613d0f0..92874b6a4 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -545,8 +545,7 @@ pub const EventLoop = struct {
},
.FetchTasklet => {
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
- fetch_task.onDone();
- fetch_task.deinit();
+ fetch_task.onProgressUpdate();
},
@field(Task.Tag, @typeName(AsyncTransformTask)) => {
var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
diff --git a/src/bun.js/webcore/blob.zig b/src/bun.js/webcore/blob.zig
index df2e17ce4..c794ab59b 100644
--- a/src/bun.js/webcore/blob.zig
+++ b/src/bun.js/webcore/blob.zig
@@ -3794,6 +3794,10 @@ pub const Blob = struct {
} else {
return build.blob.dupe();
}
+ } else if (current.toSliceClone(global)) |sliced| {
+ if (sliced.allocator.get()) |allocator| {
+ return Blob.initWithAllASCII(bun.constStrToU8(sliced.slice()), allocator, global, false);
+ }
}
},
@@ -3886,6 +3890,14 @@ pub const Blob = struct {
could_have_non_ascii = could_have_non_ascii or !(blob.is_all_ascii orelse false);
joiner.append(blob.sharedView(), 0, null);
continue;
+ } else if (current.toSliceClone(global)) |sliced| {
+ const allocator = sliced.allocator.get();
+ could_have_non_ascii = could_have_non_ascii or allocator != null;
+ joiner.append(
+ sliced.slice(),
+ 0,
+ allocator,
+ );
}
},
else => {},
@@ -3900,6 +3912,14 @@ pub const Blob = struct {
if (current.as(Blob)) |blob| {
could_have_non_ascii = could_have_non_ascii or !(blob.is_all_ascii orelse false);
joiner.append(blob.sharedView(), 0, null);
+ } else if (current.toSliceClone(global)) |sliced| {
+ const allocator = sliced.allocator.get();
+ could_have_non_ascii = could_have_non_ascii or allocator != null;
+ joiner.append(
+ sliced.slice(),
+ 0,
+ allocator,
+ );
}
},
diff --git a/src/bun.js/webcore/body.zig b/src/bun.js/webcore/body.zig
index b59125bc6..fa0ec9b24 100644
--- a/src/bun.js/webcore/body.zig
+++ b/src/bun.js/webcore/body.zig
@@ -210,10 +210,24 @@ pub const Body = struct {
/// used in HTTP server to ignore request bodies unless asked for it
onStartBuffering: ?*const fn (ctx: *anyopaque) void = null,
onStartStreaming: ?*const fn (ctx: *anyopaque) JSC.WebCore.DrainResult = null,
+ size_hint: Blob.SizeType = 0,
deinit: bool = false,
action: Action = Action{ .none = {} },
+ /// For Http Client requests
+ /// when Content-Length is provided this represents the whole size of the request
+ /// If chunked encoded this will represent the total received size (ignoring the chunk headers)
+ /// If the size is unknown will be 0
+ fn sizeHint(this: *const PendingValue) Blob.SizeType {
+ if (this.readable) |readable| {
+ if (readable.ptr == .Bytes) {
+ return readable.ptr.Bytes.size_hint;
+ }
+ }
+ return this.size_hint;
+ }
+
pub fn toAnyBlob(this: *PendingValue) ?AnyBlob {
if (this.promise != null)
return null;
@@ -375,6 +389,7 @@ pub const Body = struct {
.Blob => this.Blob.size,
.InternalBlob => @as(Blob.SizeType, @truncate(this.InternalBlob.sliceConst().len)),
.WTFStringImpl => @as(Blob.SizeType, @truncate(this.WTFStringImpl.utf8ByteLength())),
+ .Locked => this.Locked.sizeHint(),
// .InlineBlob => @truncate(Blob.SizeType, this.InlineBlob.sliceConst().len),
else => 0,
};
@@ -382,9 +397,9 @@ pub const Body = struct {
pub fn fastSize(this: *const Value) Blob.SizeType {
return switch (this.*) {
- .Blob => this.Blob.size,
.InternalBlob => @as(Blob.SizeType, @truncate(this.InternalBlob.sliceConst().len)),
.WTFStringImpl => @as(Blob.SizeType, @truncate(this.WTFStringImpl.byteSlice().len)),
+ .Locked => this.Locked.sizeHint(),
// .InlineBlob => @truncate(Blob.SizeType, this.InlineBlob.sliceConst().len),
else => 0,
};
@@ -394,6 +409,7 @@ pub const Body = struct {
return switch (this.*) {
.InternalBlob => this.InternalBlob.sliceConst().len,
.WTFStringImpl => this.WTFStringImpl.byteSlice().len,
+ .Locked => this.Locked.sizeHint(),
// .InlineBlob => this.InlineBlob.sliceConst().len,
else => 0,
};
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 4b15efc0e..01ecfad36 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -44,6 +44,7 @@ const JSPrinter = bun.js_printer;
const picohttp = @import("root").bun.picohttp;
const StringJoiner = @import("../../string_joiner.zig");
const uws = @import("root").bun.uws;
+const Mutex = @import("../../lock.zig").Lock;
const InlineBlob = JSC.WebCore.InlineBlob;
const AnyBlob = JSC.WebCore.AnyBlob;
@@ -123,6 +124,7 @@ pub const Response = struct {
pub fn writeFormat(this: *Response, comptime Formatter: type, formatter: *Formatter, writer: anytype, comptime enable_ansi_colors: bool) !void {
const Writer = @TypeOf(writer);
try writer.print("Response ({}) {{\n", .{bun.fmt.size(this.body.len())});
+
{
formatter.indent += 1;
defer formatter.indent -|= 1;
@@ -618,11 +620,21 @@ pub const Fetch = struct {
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
request_body: HTTPRequestBody = undefined,
+ /// buffer being used by AsyncHTTP
response_buffer: MutableString = undefined,
+ /// buffer used to stream response to JS
+ scheduled_response_buffer: MutableString = undefined,
+ /// response strong ref
+ response: JSC.Strong = .{},
request_headers: Headers = Headers{ .allocator = undefined },
promise: JSC.JSPromise.Strong,
concurrent_task: JSC.ConcurrentTask = .{},
poll_ref: JSC.PollRef = .{},
+ /// For Http Client requests
+ /// when Content-Length is provided this represents the whole size of the request
+ /// If chunked encoded this will represent the total received size (ignoring the chunk headers)
+ /// If is not chunked encoded and Content-Length is not provided this will be unknown
+ body_size: HTTPClient.HTTPClientResult.BodySize = .unknown,
/// This is url + proxy memory buffer and is owned by FetchTasklet
/// We always clone url and proxy (if informed)
@@ -630,11 +642,14 @@ pub const Fetch = struct {
signal: ?*JSC.WebCore.AbortSignal = null,
aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+ has_schedule_callback: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
// must be stored because AbortSignal stores reason weakly
abort_reason: JSValue = JSValue.zero,
// Custom Hostname
hostname: ?[]u8 = null,
+ is_waiting_body: bool = false,
+ mutex: Mutex,
tracker: JSC.AsyncTaskTracker,
@@ -691,6 +706,8 @@ pub const Fetch = struct {
this.result.deinitMetadata();
this.response_buffer.deinit();
+ this.response.deinit();
+ this.scheduled_response_buffer.deinit();
this.request_body.detach();
if (this.abort_reason != .zero)
@@ -707,9 +724,122 @@ pub const Fetch = struct {
this.javascript_vm.allocator.destroy(this);
}
- pub fn onDone(this: *FetchTasklet) void {
+ pub fn onBodyReceived(this: *FetchTasklet) void {
+ const success = this.result.isSuccess();
+ const globalThis = this.global_this;
+ defer {
+ if (!success or !this.result.has_more) {
+ var vm = globalThis.bunVM();
+ this.poll_ref.unref(vm);
+ this.clearData();
+ this.deinit();
+ }
+ }
+
+ if (!success) {
+ const err = this.onReject();
+ err.ensureStillAlive();
+ if (this.response.get()) |response_js| {
+ if (response_js.as(Response)) |response| {
+ const body = response.body;
+ if (body.value == .Locked) {
+ if (body.value.Locked.readable) |readable| {
+ readable.ptr.Bytes.onData(
+ .{
+ .err = .{ .JSValue = err },
+ },
+ bun.default_allocator,
+ );
+ return;
+ }
+ }
+
+ response.body.value.toErrorInstance(err, globalThis);
+ return;
+ }
+ }
+
+ globalThis.throwValue(err);
+ return;
+ }
+
+ if (this.response.get()) |response_js| {
+ if (response_js.as(Response)) |response| {
+ const body = response.body;
+ if (body.value == .Locked) {
+ if (body.value.Locked.readable) |readable| {
+ if (readable.ptr == .Bytes) {
+ readable.ptr.Bytes.size_hint = this.getSizeHint();
+
+ var scheduled_response_buffer = this.scheduled_response_buffer.list;
+
+ const chunk = scheduled_response_buffer.items;
+
+ if (this.result.has_more) {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary = bun.ByteList.initConst(chunk),
+ },
+ bun.default_allocator,
+ );
+
+ // clean for reuse later
+ this.scheduled_response_buffer.reset();
+ } else {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary_and_done = bun.ByteList.initConst(chunk),
+ },
+ bun.default_allocator,
+ );
+ }
+
+ return;
+ }
+ } else {
+ response.body.value.Locked.size_hint = this.getSizeHint();
+ }
+ // we will reach here when not streaming
+ if (!this.result.has_more) {
+ var scheduled_response_buffer = this.scheduled_response_buffer.list;
+
+ // done resolve body
+ var old = body.value;
+ var body_value = Body.Value{
+ .InternalBlob = .{
+ .bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
+ },
+ };
+ response.body.value = body_value;
+
+ this.scheduled_response_buffer = .{
+ .allocator = bun.default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ };
+
+ if (old == .Locked) {
+ old.resolve(&response.body.value, this.global_this);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ pub fn onProgressUpdate(this: *FetchTasklet) void {
JSC.markBinding(@src());
+ this.mutex.lock();
+ defer {
+ this.has_schedule_callback.store(false, .Monotonic);
+ this.mutex.unlock();
+ }
+ if (this.is_waiting_body) {
+ return this.onBodyReceived();
+ }
const globalThis = this.global_this;
var ref = this.promise;
@@ -718,13 +848,22 @@ pub const Fetch = struct {
var poll_ref = this.poll_ref;
var vm = globalThis.bunVM();
- defer poll_ref.unref(vm);
if (promise_value.isEmptyOrUndefinedOrNull()) {
+ poll_ref.unref(vm);
this.clearData();
+ this.deinit();
return;
}
+ defer {
+ if (!this.is_waiting_body) {
+ poll_ref.unref(vm);
+ this.clearData();
+ this.deinit();
+ }
+ }
+
const promise = promise_value.asAnyPromise().?;
const tracker = this.tracker;
tracker.willDispatch(globalThis);
@@ -735,7 +874,6 @@ pub const Fetch = struct {
false => this.onReject(),
};
result.ensureStillAlive();
- this.clearData();
promise_value.ensureStillAlive();
@@ -784,25 +922,77 @@ pub const Fetch = struct {
return fetch_error.toErrorInstance(this.global_this);
}
- fn toBodyValue(this: *FetchTasklet) Body.Value {
- var response_buffer = this.response_buffer.list;
- this.response_buffer = .{
- .allocator = default_allocator,
- .list = .{
- .items = &.{},
- .capacity = 0,
- },
+ pub fn onStartStreamingRequestBodyCallback(ctx: *anyopaque) JSC.WebCore.DrainResult {
+ const this = bun.cast(*FetchTasklet, ctx);
+ if (this.http) |http| {
+ http.enableBodyStreaming();
+ }
+ if (this.aborted.load(.Acquire)) {
+ return JSC.WebCore.DrainResult{
+ .aborted = {},
+ };
+ }
+
+ this.mutex.lock();
+ defer this.mutex.unlock();
+ const size_hint = this.getSizeHint();
+
+ var scheduled_response_buffer = this.scheduled_response_buffer.list;
+ // This means we have received part of the body but not the whole thing
+ if (scheduled_response_buffer.items.len > 0) {
+ this.scheduled_response_buffer = .{
+ .allocator = default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ };
+
+ return .{
+ .owned = .{
+ .list = scheduled_response_buffer.toManaged(bun.default_allocator),
+ .size_hint = size_hint,
+ },
+ };
+ }
+
+ return .{
+ .estimated_size = size_hint,
};
+ }
+
+ fn getSizeHint(this: *FetchTasklet) Blob.SizeType {
+ return switch (this.body_size) {
+ .content_length => @truncate(this.body_size.content_length),
+ .total_received => @truncate(this.body_size.total_received),
+ else => 0,
+ };
+ }
- // if (response_buffer.items.len < InlineBlob.available_bytes) {
- // const inline_blob = InlineBlob.init(response_buffer.items);
- // defer response_buffer.deinit(bun.default_allocator);
- // return .{ .InlineBlob = inline_blob };
- // }
+ fn toBodyValue(this: *FetchTasklet) Body.Value {
+ if (this.is_waiting_body) {
+ const response = Body.Value{
+ .Locked = .{
+ .size_hint = this.getSizeHint(),
+ .task = this,
+ .global = this.global_this,
+ .onStartStreaming = FetchTasklet.onStartStreamingRequestBodyCallback,
+ },
+ };
+ return response;
+ }
+ var scheduled_response_buffer = this.scheduled_response_buffer.list;
const response = Body.Value{
.InternalBlob = .{
- .bytes = response_buffer.toManaged(bun.default_allocator),
+ .bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
+ },
+ };
+ this.scheduled_response_buffer = .{
+ .allocator = default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
},
};
@@ -811,6 +1001,7 @@ pub const Fetch = struct {
fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response {
const http_response = this.result.response;
+ this.is_waiting_body = this.result.has_more;
return Response{
.allocator = allocator,
.url = bun.String.createAtomIfPossible(this.result.href),
@@ -830,7 +1021,10 @@ pub const Fetch = struct {
const allocator = bun.default_allocator;
var response = allocator.create(Response) catch unreachable;
response.* = this.toResponse(allocator);
- return Response.makeMaybePooled(@as(js.JSContextRef, @ptrCast(this.global_this)), response);
+ const response_js = Response.makeMaybePooled(@as(js.JSContextRef, this.global_this), response);
+ response_js.ensureStillAlive();
+ this.response = JSC.Strong.create(response_js, this.global_this);
+ return response_js;
}
pub fn get(
@@ -843,6 +1037,14 @@ pub const Fetch = struct {
var fetch_tasklet = try jsc_vm.allocator.create(FetchTasklet);
fetch_tasklet.* = .{
+ .mutex = Mutex.init(),
+ .scheduled_response_buffer = .{
+ .allocator = bun.default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ },
.response_buffer = MutableString{
.allocator = bun.default_allocator,
.list = .{
@@ -905,6 +1107,8 @@ pub const Fetch = struct {
fetch_tasklet.http.?.client.disable_timeout = fetch_options.disable_timeout;
fetch_tasklet.http.?.client.verbose = fetch_options.verbose;
fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive;
+ // we wanna to return after headers are received
+ fetch_tasklet.http.?.signalHeaderProgress();
if (fetch_tasklet.request_body == .Sendfile) {
std.debug.assert(fetch_options.url.isHTTP());
@@ -973,8 +1177,26 @@ pub const Fetch = struct {
}
pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void {
- task.response_buffer = result.body.?.*;
+ task.mutex.lock();
+ defer task.mutex.unlock();
task.result = result;
+ task.body_size = result.body_size;
+
+ const success = result.isSuccess();
+ task.response_buffer = result.body.?.*;
+
+ if (success) {
+ _ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch @panic("OOM");
+ }
+
+ // reset for reuse
+ task.response_buffer.reset();
+
+ if (task.has_schedule_callback.compareAndSwap(false, true, .Acquire, .Monotonic)) |has_schedule_callback| {
+ if (has_schedule_callback) {
+ return;
+ }
+ }
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
}
};
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index be6942392..955d10ffb 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -525,9 +525,16 @@ pub const StreamResult = union(Tag) {
into_array: IntoArray,
into_array_and_done: IntoArray,
pending: *Pending,
- err: Syscall.Error,
+
+ err: union(Err) { Error: Syscall.Error, JSValue: JSC.JSValue },
+
done: void,
+ pub const Err = enum {
+ Error,
+ JSValue,
+ };
+
pub const Tag = enum {
owned,
owned_and_done,
@@ -757,7 +764,14 @@ pub const StreamResult = union(Tag) {
promise.asValue(globalThis).unprotect();
switch (result) {
.err => |err| {
- promise.reject(globalThis, err.toJSC(globalThis));
+ if (err == .Error) {
+ promise.reject(globalThis, err.Error.toJSC(globalThis));
+ } else {
+ const js_err = err.JSValue;
+ js_err.ensureStillAlive();
+ js_err.unprotect();
+ promise.reject(globalThis, js_err);
+ }
},
.done => {
promise.resolve(globalThis, JSValue.jsBoolean(false));
@@ -803,7 +817,13 @@ pub const StreamResult = union(Tag) {
},
.err => |err| {
- return JSC.JSPromise.rejectedPromise(globalThis, JSValue.c(err.toJS(globalThis))).asValue(globalThis);
+ if (err == .Error) {
+ return JSC.JSPromise.rejectedPromise(globalThis, JSValue.c(err.Error.toJS(globalThis))).asValue(globalThis);
+ }
+ const js_err = err.JSValue;
+ js_err.ensureStillAlive();
+ js_err.unprotect();
+ return JSC.JSPromise.rejectedPromise(globalThis, js_err).asValue(globalThis);
},
// false == controller.close()
@@ -2380,6 +2400,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
return true;
} else {
this.has_backpressure = !this.res.write(buf);
+ if (this.has_backpressure) {
+ this.res.onWritable(*@This(), onWritable, this);
+ }
return true;
}
@@ -2986,7 +3009,14 @@ pub fn ReadableStreamSource(
pub fn processResult(globalThis: *JSGlobalObject, flags: JSValue, result: StreamResult) JSC.JSValue {
switch (result) {
.err => |err| {
- globalThis.vm().throwError(globalThis, err.toJSC(globalThis));
+ if (err == .Error) {
+ globalThis.vm().throwError(globalThis, err.Error.toJSC(globalThis));
+ } else {
+ const js_err = err.JSValue;
+ js_err.ensureStillAlive();
+ js_err.unprotect();
+ globalThis.vm().throwError(globalThis, js_err);
+ }
return JSValue.jsUndefined();
},
.temporary_and_done, .owned_and_done, .into_array_and_done => {
@@ -3301,12 +3331,29 @@ pub const ByteStream = struct {
if (is_really_done) {
this.done = true;
- this.pending.result = .{
- .into_array_and_done = .{
- .value = this.value(),
- .len = @as(Blob.SizeType, @truncate(to_copy.len)),
- },
- };
+
+ if (to_copy.len == 0) {
+ if (stream == .err) {
+ if (stream.err == .Error) {
+ this.pending.result = .{ .err = .{ .Error = stream.err.Error } };
+ }
+ const js_err = stream.err.JSValue;
+ js_err.ensureStillAlive();
+ js_err.protect();
+ this.pending.result = .{ .err = .{ .JSValue = js_err } };
+ } else {
+ this.pending.result = .{
+ .done = {},
+ };
+ }
+ } else {
+ this.pending.result = .{
+ .into_array_and_done = .{
+ .value = this.value(),
+ .len = @as(Blob.SizeType, @truncate(to_copy.len)),
+ },
+ };
+ }
} else {
this.pending.result = .{
.into_array = .{
@@ -3488,7 +3535,7 @@ pub const ReadResult = union(enum) {
pub fn toStreamWithIsDone(this: ReadResult, pending: *StreamResult.Pending, buf: []u8, view: JSValue, close_on_empty: bool, is_done: bool) StreamResult {
return switch (this) {
.pending => .{ .pending = pending },
- .err => .{ .err = this.err },
+ .err => .{ .err = .{ .Error = this.err } },
.done => .{ .done = {} },
.read => |slice| brk: {
const owned = slice.ptr != buf.ptr;
@@ -4064,17 +4111,21 @@ pub const File = struct {
this.concurrent.read = @as(Blob.SizeType, @truncate(result catch |err| {
if (@hasField(HTTPClient.NetworkThread.Completion, "result")) {
this.pending.result = .{
- .err = Syscall.Error{
- .errno = @as(Syscall.Error.Int, @intCast(-completion.result)),
- .syscall = .read,
+ .err = .{
+ .Error = Syscall.Error{
+ .errno = @as(Syscall.Error.Int, @intCast(-completion.result)),
+ .syscall = .read,
+ },
},
};
} else {
this.pending.result = .{
- .err = Syscall.Error{
- // this is too hacky
- .errno = @as(Syscall.Error.Int, @truncate(@as(u16, @intCast(@max(1, @intFromError(err)))))),
- .syscall = .read,
+ .err = .{
+ .Error = Syscall.Error{
+ // this is too hacky
+ .errno = @as(Syscall.Error.Int, @truncate(@as(u16, @intCast(@max(1, @intFromError(err)))))),
+ .syscall = .read,
+ },
},
};
}
@@ -4101,7 +4152,7 @@ pub const File = struct {
else => {},
}
- this.pending.result = .{ .err = err };
+ this.pending.result = .{ .err = .{ .Error = err } };
scheduleMainThreadTask(this);
return;
},
diff --git a/src/cli/create_command.zig b/src/cli/create_command.zig
index 544329a98..284de1fd5 100644
--- a/src/cli/create_command.zig
+++ b/src/cli/create_command.zig
@@ -1852,7 +1852,19 @@ pub const Example = struct {
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
- async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, api_url, header_entries, headers_buf, mutable, "", 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow);
+ async_http.* = HTTP.AsyncHTTP.initSync(
+ ctx.allocator,
+ .GET,
+ api_url,
+ header_entries,
+ headers_buf,
+ mutable,
+ "",
+ 60 * std.time.ns_per_min,
+ http_proxy,
+ null,
+ HTTP.FetchRedirect.follow,
+ );
async_http.client.progress_node = progress;
const response = try async_http.sendSync(true);
@@ -1916,7 +1928,19 @@ pub const Example = struct {
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable;
- async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, url, .{}, "", mutable, "", 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow);
+ async_http.* = HTTP.AsyncHTTP.initSync(
+ ctx.allocator,
+ .GET,
+ url,
+ .{},
+ "",
+ mutable,
+ "",
+ 60 * std.time.ns_per_min,
+ http_proxy,
+ null,
+ HTTP.FetchRedirect.follow,
+ );
async_http.client.progress_node = progress;
var response = try async_http.sendSync(true);
@@ -1992,7 +2016,19 @@ pub const Example = struct {
http_proxy = env_loader.getHttpProxy(parsed_tarball_url);
- async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, parsed_tarball_url, .{}, "", mutable, "", 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow);
+ async_http.* = HTTP.AsyncHTTP.initSync(
+ ctx.allocator,
+ .GET,
+ parsed_tarball_url,
+ .{},
+ "",
+ mutable,
+ "",
+ 60 * std.time.ns_per_min,
+ http_proxy,
+ null,
+ HTTP.FetchRedirect.follow,
+ );
async_http.client.progress_node = progress;
refresher.maybeRefresh();
@@ -2022,7 +2058,19 @@ pub const Example = struct {
var mutable = try ctx.allocator.create(MutableString);
mutable.* = try MutableString.init(ctx.allocator, 2048);
- async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, url, .{}, "", mutable, "", 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow);
+ async_http.* = HTTP.AsyncHTTP.initSync(
+ ctx.allocator,
+ .GET,
+ url,
+ .{},
+ "",
+ mutable,
+ "",
+ 60 * std.time.ns_per_min,
+ http_proxy,
+ null,
+ HTTP.FetchRedirect.follow,
+ );
if (Output.enable_ansi_colors) {
async_http.client.progress_node = progress_node;
diff --git a/src/cli/upgrade_command.zig b/src/cli/upgrade_command.zig
index 3fadfe5c2..79a7777f3 100644
--- a/src/cli/upgrade_command.zig
+++ b/src/cli/upgrade_command.zig
@@ -223,7 +223,19 @@ pub const UpgradeCommand = struct {
// ensure very stable memory address
var async_http: *HTTP.AsyncHTTP = allocator.create(HTTP.AsyncHTTP) catch unreachable;
- async_http.* = HTTP.AsyncHTTP.initSync(allocator, .GET, api_url, header_entries, headers_buf, &metadata_body, "", 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow);
+ async_http.* = HTTP.AsyncHTTP.initSync(
+ allocator,
+ .GET,
+ api_url,
+ header_entries,
+ headers_buf,
+ &metadata_body,
+ "",
+ 60 * std.time.ns_per_min,
+ http_proxy,
+ null,
+ HTTP.FetchRedirect.follow,
+ );
if (!silent) async_http.client.progress_node = progress;
const response = try async_http.sendSync(true);
@@ -454,7 +466,19 @@ pub const UpgradeCommand = struct {
var zip_file_buffer = try ctx.allocator.create(MutableString);
zip_file_buffer.* = try MutableString.init(ctx.allocator, @max(version.size, 1024));
- async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, zip_url, .{}, "", zip_file_buffer, "", timeout, http_proxy, null, HTTP.FetchRedirect.follow);
+ async_http.* = HTTP.AsyncHTTP.initSync(
+ ctx.allocator,
+ .GET,
+ zip_url,
+ .{},
+ "",
+ zip_file_buffer,
+ "",
+ timeout,
+ http_proxy,
+ null,
+ HTTP.FetchRedirect.follow,
+ );
async_http.client.timeout = timeout;
async_http.client.progress_node = progress;
const response = try async_http.sendSync(true);
diff --git a/src/deps/uws.zig b/src/deps/uws.zig
index 5714eb09f..2610b0720 100644
--- a/src/deps/uws.zig
+++ b/src/deps/uws.zig
@@ -200,6 +200,7 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type {
this.socket,
).?;
}
+
pub fn flush(this: ThisSocket) void {
return us_socket_flush(
comptime ssl_int,
@@ -687,6 +688,13 @@ pub const SocketContext = opaque {
us_socket_context_free(@as(i32, 0), this);
}
+ fn getLoop(this: *SocketContext, ssl: bool) ?*Loop {
+ if (ssl) {
+ return us_socket_context_loop(@as(i32, 1), this);
+ }
+ return us_socket_context_loop(@as(i32, 0), this);
+ }
+
/// closes and deinit the SocketContexts
pub fn deinit(this: *SocketContext, ssl: bool) void {
this.close(ssl);
@@ -1000,6 +1008,14 @@ pub const Poll = opaque {
us_poll_stop(self, loop);
}
+ pub fn change(self: *Poll, loop: *Loop, events: i32) void {
+ us_poll_change(self, loop, events);
+ }
+
+ pub fn getEvents(self: *Poll) i32 {
+ return us_poll_events(self);
+ }
+
pub fn data(self: *Poll, comptime Data: type) *Data {
return us_poll_ext(self).?;
}
diff --git a/src/http/zlib.zig b/src/http/zlib.zig
index e827dc1b3..8144930c2 100644
--- a/src/http/zlib.zig
+++ b/src/http/zlib.zig
@@ -11,7 +11,6 @@ fn initMutableString(allocator: std.mem.Allocator) anyerror!MutableString {
}
const BufferPool = bun.ObjectPool(MutableString, initMutableString, false, 4);
-
pub fn get(allocator: std.mem.Allocator) *MutableString {
return &BufferPool.get(allocator).data;
}
diff --git a/src/http_client_async.zig b/src/http_client_async.zig
index 4998cec85..725e960d6 100644
--- a/src/http_client_async.zig
+++ b/src/http_client_async.zig
@@ -30,6 +30,7 @@ const ObjectPool = @import("./pool.zig").ObjectPool;
const SOCK = os.SOCK;
const Arena = @import("./mimalloc_arena.zig").Arena;
const ZlibPool = @import("./http/zlib.zig");
+
const URLBufferPool = ObjectPool([4096]u8, null, false, 10);
const uws = bun.uws;
pub const MimeType = @import("./http/mime_type.zig");
@@ -723,6 +724,11 @@ pub const HTTPThread = struct {
this.loop.wakeup();
}
+ pub fn wakeup(this: *@This()) void {
+ if (this.has_awoken.load(.Monotonic))
+ this.loop.wakeup();
+ }
+
pub fn schedule(this: *@This(), batch: Batch) void {
if (batch.len == 0)
return;
@@ -808,11 +814,12 @@ pub fn onClose(
// if the peer closed after a full chunk, treat this
// as if the transfer had complete, browsers appear to ignore
// a missing 0\r\n chunk
- if (in_progress and client.state.transfer_encoding == .chunked) {
+ if (in_progress and client.state.isChunkedEncoding()) {
if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
var buf = client.state.getBodyBuffer();
if (buf.list.items.len > 0) {
- client.done(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
+ client.state.received_last_chunk = true;
+ client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
return;
}
}
@@ -988,14 +995,17 @@ pub const InternalState = struct {
response_message_buffer: MutableString = undefined,
pending_response: picohttp.Response = undefined,
allow_keepalive: bool = true,
+ received_last_chunk: bool = false,
transfer_encoding: Encoding = Encoding.identity,
encoding: Encoding = Encoding.identity,
content_encoding_i: u8 = std.math.maxInt(u8),
chunked_decoder: picohttp.phr_chunked_decoder = .{},
+ zlib_reader: ?*Zlib.ZlibReaderArrayList = null,
stage: Stage = Stage.pending,
body_out_str: ?*MutableString = null,
compressed_body: MutableString = undefined,
- body_size: usize = 0,
+ content_length: ?usize = null,
+ total_body_received: usize = 0,
request_body: []const u8 = "",
original_request_body: HTTPRequestBody = .{ .bytes = "" },
request_sent_len: usize = 0,
@@ -1015,12 +1025,20 @@ pub const InternalState = struct {
};
}
+ pub fn isChunkedEncoding(this: *InternalState) bool {
+ return this.transfer_encoding == Encoding.chunked;
+ }
+
pub fn reset(this: *InternalState) void {
this.compressed_body.deinit();
this.response_message_buffer.deinit();
var body_msg = this.body_out_str;
if (body_msg) |body| body.reset();
+ if (this.zlib_reader) |reader| {
+ this.zlib_reader = null;
+ reader.deinit();
+ }
this.* = .{
.body_out_str = body_msg,
@@ -1042,27 +1060,78 @@ pub const InternalState = struct {
}
}
- fn decompress(this: *InternalState, buffer: MutableString, body_out_str: *MutableString) !void {
- defer this.compressed_body.deinit();
+ fn isDone(this: *InternalState) bool {
+ if (this.isChunkedEncoding()) {
+ return this.received_last_chunk;
+ }
+
+ if (this.content_length) |content_length| {
+ return this.total_body_received >= content_length;
+ }
+ // TODO: in future to handle Content-Type: text/event-stream we should be done only when Close/End/Timeout connection
+ return true;
+ }
+
+ fn decompressConst(this: *InternalState, buffer: []const u8, body_out_str: *MutableString) !void {
+ defer this.compressed_body.reset();
var gzip_timer: std.time.Timer = undefined;
if (extremely_verbose)
gzip_timer = std.time.Timer.start() catch @panic("Timer failure");
- body_out_str.list.expandToCapacity();
+ var reader: *Zlib.ZlibReaderArrayList = undefined;
+ if (this.zlib_reader) |current_reader| {
+ reader = current_reader;
+ reader.zlib.next_in = buffer.ptr;
+ reader.zlib.avail_in = @as(u32, @truncate(buffer.len));
+
+ reader.list = body_out_str.list;
+ const initial = body_out_str.list.items.len;
+ body_out_str.list.expandToCapacity();
+ if (body_out_str.list.capacity == initial) {
+ try body_out_str.list.ensureUnusedCapacity(body_out_str.allocator, 4096);
+ body_out_str.list.expandToCapacity();
+ }
+ reader.zlib.next_out = &body_out_str.list.items[initial];
+ reader.zlib.avail_out = @as(u32, @truncate(body_out_str.list.capacity - initial));
+ // we reset the total out so we can track how much we decompressed this time
+ reader.zlib.total_out = initial;
+ } else {
+ reader = try Zlib.ZlibReaderArrayList.initWithOptionsAndListAllocator(
+ buffer,
+ &body_out_str.list,
+ body_out_str.allocator,
+ default_allocator,
+ .{
+ // TODO: add br support today we support gzip and deflate only
+ // zlib.MAX_WBITS = 15
+ // to (de-)compress deflate format, use wbits = -zlib.MAX_WBITS
+ // to (de-)compress zlib format, use wbits = zlib.MAX_WBITS
+ // to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16
+ .windowBits = if (this.encoding == Encoding.gzip) Zlib.MAX_WBITS | 16 else -Zlib.MAX_WBITS,
+ },
+ );
+ this.zlib_reader = reader;
+ }
- ZlibPool.decompress(buffer.list.items, body_out_str, default_allocator) catch |err| {
- Output.prettyErrorln("<r><red>Zlib error: {s}<r>", .{bun.asByteSlice(@errorName(err))});
- Output.flush();
- return err;
+ reader.readAll() catch |err| {
+ if (this.isDone() or error.ShortRead != err) {
+ Output.prettyErrorln("<r><red>Zlib error: {s}<r>", .{bun.asByteSlice(@errorName(err))});
+ Output.flush();
+ return err;
+ }
};
if (extremely_verbose)
this.gzip_elapsed = gzip_timer.read();
}
- pub fn processBodyBuffer(this: *InternalState, buffer: MutableString) !void {
+ fn decompress(this: *InternalState, buffer: MutableString, body_out_str: *MutableString) !void {
+ try this.decompressConst(buffer.list.items, body_out_str);
+ }
+
+ pub fn processBodyBuffer(this: *InternalState, buffer: MutableString) !usize {
var body_out_str = this.body_out_str.?;
switch (this.encoding) {
@@ -1080,10 +1149,10 @@ pub const InternalState = struct {
},
}
- this.postProcessBody(body_out_str);
+ return this.postProcessBody();
}
- pub fn postProcessBody(this: *InternalState, body_out_str: *MutableString) void {
+ pub fn postProcessBody(this: *InternalState) usize {
var response = &this.pending_response;
// if it compressed with this header, it is no longer
if (this.content_encoding_i < response.headers.len) {
@@ -1093,7 +1162,7 @@ pub const InternalState = struct {
this.content_encoding_i = std.math.maxInt(@TypeOf(this.content_encoding_i));
}
- this.body_size = @as(usize, @truncate(body_out_str.list.items.len));
+ return this.body_out_str.?.list.items.len;
}
};
@@ -1119,7 +1188,7 @@ disable_keepalive: bool = false,
state: InternalState = .{},
-completion_callback: HTTPClientResult.Callback = undefined,
+result_callback: HTTPClientResult.Callback = undefined,
/// Some HTTP servers (such as npm) report Last-Modified times but ignore If-Modified-Since.
/// This is a workaround for that.
@@ -1135,7 +1204,20 @@ proxy_tunnel: ?ProxyTunnel = null,
aborted: ?*std.atomic.Atomic(bool) = null,
async_http_id: u32 = 0,
hostname: ?[]u8 = null,
-pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, header_entries: Headers.Entries, header_buf: string, signal: ?*std.atomic.Atomic(bool), hostname: ?[]u8) HTTPClient {
+signal_header_progress: *std.atomic.Atomic(bool),
+enable_body_stream: *std.atomic.Atomic(bool),
+
+pub fn init(
+ allocator: std.mem.Allocator,
+ method: Method,
+ url: URL,
+ header_entries: Headers.Entries,
+ header_buf: string,
+ signal: ?*std.atomic.Atomic(bool),
+ hostname: ?[]u8,
+ signal_header_progress: *std.atomic.Atomic(bool),
+ enable_body_stream: *std.atomic.Atomic(bool),
+) HTTPClient {
return HTTPClient{
.allocator = allocator,
.method = method,
@@ -1144,6 +1226,8 @@ pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, header_entri
.header_buf = header_buf,
.aborted = signal,
.hostname = hostname,
+ .signal_header_progress = signal_header_progress,
+ .enable_body_stream = enable_body_stream,
};
}
@@ -1160,9 +1244,6 @@ pub fn deinit(this: *HTTPClient) void {
tunnel.deinit();
this.proxy_tunnel = null;
}
-
- this.state.compressed_body.deinit();
- this.state.response_message_buffer.deinit();
}
const Stage = enum(u8) {
@@ -1286,7 +1367,7 @@ pub const AsyncHTTP = struct {
next: ?*AsyncHTTP = null,
task: ThreadPool.Task = ThreadPool.Task{ .callback = &startAsyncHTTP },
- completion_callback: HTTPClientResult.Callback = undefined,
+ result_callback: HTTPClientResult.Callback = undefined,
/// Timeout in nanoseconds
timeout: usize = 0,
@@ -1303,6 +1384,9 @@ pub const AsyncHTTP = struct {
elapsed: u64 = 0,
gzip_elapsed: u64 = 0,
+ signal_header_progress: std.atomic.Atomic(bool),
+ enable_body_stream: std.atomic.Atomic(bool),
+
pub var active_requests_count = std.atomic.Atomic(usize).init(0);
pub var max_simultaneous_requests = std.atomic.Atomic(usize).init(256);
@@ -1332,6 +1416,16 @@ pub const AsyncHTTP = struct {
}
}
+ pub fn signalHeaderProgress(this: *AsyncHTTP) void {
+ @fence(.Release);
+ this.client.signal_header_progress.store(true, .Release);
+ }
+
+ pub fn enableBodyStreaming(this: *AsyncHTTP) void {
+ @fence(.Release);
+ this.client.enable_body_stream.store(true, .Release);
+ }
+
pub fn clearData(this: *AsyncHTTP) void {
this.response_headers.deinit(this.allocator);
this.response_headers = .{};
@@ -1371,12 +1465,14 @@ pub const AsyncHTTP = struct {
.request_header_buf = headers_buf,
.request_body = .{ .bytes = request_body },
.response_buffer = response_buffer,
- .completion_callback = callback,
+ .result_callback = callback,
.http_proxy = http_proxy,
.async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0,
+ .signal_header_progress = std.atomic.Atomic(bool).init(false),
+ .enable_body_stream = std.atomic.Atomic(bool).init(false),
};
- this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname);
+ this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname, &this.signal_header_progress, &this.enable_body_stream);
this.client.async_http_id = this.async_http_id;
this.client.timeout = timeout;
this.client.http_proxy = this.http_proxy;
@@ -1537,7 +1633,7 @@ pub const AsyncHTTP = struct {
var ctx = try bun.default_allocator.create(SingleHTTPChannel);
ctx.* = SingleHTTPChannel.init();
- this.completion_callback = HTTPClientResult.Callback.New(
+ this.result_callback = HTTPClientResult.Callback.New(
*SingleHTTPChannel,
sendSyncCallback,
).init(ctx);
@@ -1557,12 +1653,10 @@ pub const AsyncHTTP = struct {
unreachable;
}
- pub fn onAsyncHTTPComplete(this: *AsyncHTTP, result: HTTPClientResult) void {
+ pub fn onAsyncHTTPCallback(this: *AsyncHTTP, result: HTTPClientResult) void {
std.debug.assert(this.real != null);
- const active_requests = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic);
- std.debug.assert(active_requests > 0);
- var completion = this.completion_callback;
+ var callback = this.result_callback;
this.elapsed = http_thread.timer.read() -| this.elapsed;
this.redirected = this.client.remaining_redirect_count != default_redirect_count;
if (!result.isSuccess()) {
@@ -1574,19 +1668,27 @@ pub const AsyncHTTP = struct {
this.response = result.response;
this.state.store(.success, .Monotonic);
}
- this.client.deinit();
- this.real.?.* = this.*;
- this.real.?.response_buffer = this.response_buffer;
+ if (result.has_more) {
+ callback.function(callback.ctx, result);
+ } else {
+ this.client.deinit();
+
+ this.real.?.* = this.*;
+ this.real.?.response_buffer = this.response_buffer;
- log("onAsyncHTTPComplete: {any}", .{bun.fmt.fmtDuration(this.elapsed)});
+ log("onAsyncHTTPCallback: {any}", .{bun.fmt.fmtDuration(this.elapsed)});
- default_allocator.destroy(this);
+ default_allocator.destroy(this);
- completion.function(completion.ctx, result);
+ const active_requests = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic);
+ std.debug.assert(active_requests > 0);
- if (active_requests >= AsyncHTTP.max_simultaneous_requests.load(.Monotonic)) {
- http_thread.drainEvents();
+ callback.function(callback.ctx, result);
+
+ if (active_requests >= AsyncHTTP.max_simultaneous_requests.load(.Monotonic)) {
+ http_thread.drainEvents();
+ }
}
}
@@ -1599,7 +1701,7 @@ pub const AsyncHTTP = struct {
_ = active_requests_count.fetchAdd(1, .Monotonic);
this.err = null;
this.state.store(.sending, .Monotonic);
- this.client.completion_callback = HTTPClientResult.Callback.New(*AsyncHTTP, onAsyncHTTPComplete).init(
+ this.client.result_callback = HTTPClientResult.Callback.New(*AsyncHTTP, onAsyncHTTPCallback).init(
this,
);
@@ -2153,6 +2255,7 @@ fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTP
}
pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void {
+ log("onData {}", .{incoming_data.len});
if (this.hasSignalAborted()) {
this.closeAndAbort(is_ssl, socket);
return;
@@ -2241,7 +2344,11 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
this.cloneMetadata();
if (!can_continue) {
- this.done(is_ssl, ctx, socket);
+ // if is chuncked but no body is expected we mark the last chunk
+ this.state.received_last_chunk = true;
+ // if is not we ignore the content_length
+ this.state.content_length = 0;
+ this.progressUpdate(is_ssl, ctx, socket);
return;
}
@@ -2251,35 +2358,45 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
}
if (body_buf.len == 0) {
+ // no body data yet, but we can report the headers
+ if (this.signal_header_progress.load(.Acquire)) {
+ this.progressUpdate(is_ssl, ctx, socket);
+ }
return;
}
if (this.state.response_stage == .body) {
{
- const is_done = this.handleResponseBody(body_buf, true) catch |err| {
+ const report_progress = this.handleResponseBody(body_buf, true) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
- if (is_done) {
- this.done(is_ssl, ctx, socket);
+ if (report_progress) {
+ this.progressUpdate(is_ssl, ctx, socket);
return;
}
}
} else if (this.state.response_stage == .body_chunk) {
this.setTimeout(socket, 500);
{
- const is_done = this.handleResponseBodyChunkedEncoding(body_buf) catch |err| {
+ const report_progress = this.handleResponseBodyChunkedEncoding(body_buf) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
- if (is_done) {
- this.done(is_ssl, ctx, socket);
+ if (report_progress) {
+ this.progressUpdate(is_ssl, ctx, socket);
return;
}
}
}
+
+ // if not reported we report partially now
+ if (this.signal_header_progress.load(.Acquire)) {
+ this.progressUpdate(is_ssl, ctx, socket);
+ return;
+ }
},
.body => {
@@ -2295,23 +2412,23 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
defer data.deinit();
const decoded_data = data.slice();
if (decoded_data.len == 0) return;
- const is_done = this.handleResponseBody(decoded_data, false) catch |err| {
+ const report_progress = this.handleResponseBody(decoded_data, false) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
- if (is_done) {
- this.done(is_ssl, ctx, socket);
+ if (report_progress) {
+ this.progressUpdate(is_ssl, ctx, socket);
return;
}
} else {
- const is_done = this.handleResponseBody(incoming_data, false) catch |err| {
+ const report_progress = this.handleResponseBody(incoming_data, false) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
- if (is_done) {
- this.done(is_ssl, ctx, socket);
+ if (report_progress) {
+ this.progressUpdate(is_ssl, ctx, socket);
return;
}
}
@@ -2331,23 +2448,23 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
const decoded_data = data.slice();
if (decoded_data.len == 0) return;
- const is_done = this.handleResponseBodyChunkedEncoding(decoded_data) catch |err| {
+ const report_progress = this.handleResponseBodyChunkedEncoding(decoded_data) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
- if (is_done) {
- this.done(is_ssl, ctx, socket);
+ if (report_progress) {
+ this.progressUpdate(is_ssl, ctx, socket);
return;
}
} else {
- const is_done = this.handleResponseBodyChunkedEncoding(incoming_data) catch |err| {
+ const report_progress = this.handleResponseBodyChunkedEncoding(incoming_data) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
- if (is_done) {
- this.done(is_ssl, ctx, socket);
+ if (report_progress) {
+ this.progressUpdate(is_ssl, ctx, socket);
return;
}
}
@@ -2403,7 +2520,7 @@ fn fail(this: *HTTPClient, err: anyerror) void {
this.state.fail = err;
this.state.stage = .fail;
- const callback = this.completion_callback;
+ const callback = this.result_callback;
const result = this.toResult(this.cloned_metadata);
this.state.reset();
this.proxy_tunneling = false;
@@ -2440,39 +2557,50 @@ pub fn setTimeout(this: *HTTPClient, socket: anytype, amount: c_uint) void {
socket.timeout(amount);
}
-pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void {
+pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void {
if (this.state.stage != .done and this.state.stage != .fail) {
- if (this.aborted != null) {
+ const is_done = this.state.isDone();
+
+ if (this.aborted != null and is_done) {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
- log("done", .{});
+ log("progressUpdate {}", .{is_done});
var out_str = this.state.body_out_str.?;
var body = out_str.*;
this.cloned_metadata.response = this.state.pending_response;
const result = this.toResult(this.cloned_metadata);
- const callback = this.completion_callback;
+ const callback = this.result_callback;
- socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr());
+ if (is_done) {
+ socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr());
- if (this.state.allow_keepalive and !this.disable_keepalive and !socket.isClosed() and FeatureFlags.enable_keepalive) {
- ctx.releaseSocket(
- socket,
- this.connected_url.hostname,
- this.connected_url.getPortAuto(),
- );
- } else if (!socket.isClosed()) {
- socket.close(0, null);
+ if (this.state.allow_keepalive and !this.disable_keepalive and !socket.isClosed() and FeatureFlags.enable_keepalive) {
+ ctx.releaseSocket(
+ socket,
+ this.connected_url.hostname,
+ this.connected_url.getPortAuto(),
+ );
+ } else if (!socket.isClosed()) {
+ socket.close(0, null);
+ }
+ this.state.reset();
+ this.state.response_stage = .done;
+ this.state.request_stage = .done;
+ this.state.stage = .done;
+ this.proxy_tunneling = false;
+ if (comptime print_every > 0) {
+ print_every_i += 1;
+ if (print_every_i % print_every == 0) {
+ Output.prettyln("Heap stats for HTTP thread\n", .{});
+ Output.flush();
+ default_arena.dumpThreadStats();
+ print_every_i = 0;
+ }
+ }
}
-
- this.state.reset();
result.body.?.* = body;
- std.debug.assert(this.state.stage != .done);
- this.state.response_stage = .done;
- this.state.request_stage = .done;
- this.state.stage = .done;
- this.proxy_tunneling = false;
if (comptime print_every > 0) {
print_every_i += 1;
if (print_every_i % print_every == 0) {
@@ -2494,6 +2622,19 @@ pub const HTTPClientResult = struct {
fail: anyerror = error.NoError,
redirected: bool = false,
headers_buf: []picohttp.Header = &.{},
+ has_more: bool = false,
+
+ /// For Http Client requests
+ /// when Content-Length is provided this represents the whole size of the request
+ /// If chunked encoded this will represent the total received size (ignoring the chunk headers)
+ /// If is not chunked encoded and Content-Length is not provided this will be unknown
+ body_size: BodySize = .unknown,
+
+ pub const BodySize = union(enum) {
+ total_received: usize,
+ content_length: usize,
+ unknown: void,
+ };
pub fn isSuccess(this: *const HTTPClientResult) bool {
return this.fail == error.NoError;
@@ -2547,6 +2688,12 @@ pub const HTTPClientResult = struct {
};
pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientResult {
+ const body_size: HTTPClientResult.BodySize = if (this.state.isChunkedEncoding())
+ .{ .total_received = this.state.total_body_received }
+ else if (this.state.content_length) |content_length|
+ .{ .content_length = content_length }
+ else
+ .{ .unknown = {} };
return HTTPClientResult{
.body = this.state.body_out_str,
.response = metadata.response,
@@ -2555,6 +2702,8 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientRes
.href = metadata.url,
.fail = this.state.fail,
.headers_buf = metadata.response.headers,
+ .has_more = this.state.fail == error.NoError and !this.state.isDone(),
+ .body_size = body_size,
};
}
@@ -2566,10 +2715,10 @@ const preallocate_max = 1024 * 1024 * 256;
pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool {
std.debug.assert(this.state.transfer_encoding == .identity);
-
+ const content_length = this.state.content_length orelse 0;
// is it exactly as much as we need?
- if (is_only_buffer and incoming_data.len >= this.state.body_size) {
- try handleResponseBodyFromSinglePacket(this, incoming_data[0..this.state.body_size]);
+ if (is_only_buffer and incoming_data.len >= content_length) {
+ try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length]);
return true;
} else {
return handleResponseBodyFromMultiplePackets(this, incoming_data);
@@ -2577,6 +2726,10 @@ pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_
}
fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const u8) !void {
+ if (!this.state.isChunkedEncoding()) {
+ this.state.total_body_received += incoming_data.len;
+ }
+
if (this.state.encoding.isCompressed()) {
var body_buffer = this.state.body_out_str.?;
if (body_buffer.list.capacity == 0) {
@@ -2584,7 +2737,7 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const
try body_buffer.growBy(@max(@as(usize, @intFromFloat(min)), 32));
}
- try ZlibPool.decompress(incoming_data, body_buffer, default_allocator);
+ try this.state.decompressConst(incoming_data, body_buffer);
} else {
try this.state.getBodyBuffer().appendSliceExact(incoming_data);
}
@@ -2605,42 +2758,44 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const
progress.context.maybeRefresh();
}
- this.state.postProcessBody(this.state.getBodyBuffer());
+ _ = this.state.postProcessBody();
}
fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool {
var buffer = this.state.getBodyBuffer();
+ const content_length = this.state.content_length orelse 0;
if (buffer.list.items.len == 0 and
- this.state.body_size > 0 and this.state.body_size < preallocate_max)
+ content_length > 0 and incoming_data.len < preallocate_max)
{
- // since we don't do streaming yet, we might as well just allocate the whole thing
- // when we know the expected size
- buffer.list.ensureTotalCapacityPrecise(buffer.allocator, this.state.body_size) catch {};
+ buffer.list.ensureTotalCapacityPrecise(buffer.allocator, incoming_data.len) catch {};
}
- const remaining_content_length = this.state.body_size -| buffer.list.items.len;
+ const remaining_content_length = content_length -| this.state.total_body_received;
var remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)];
_ = try buffer.write(remainder);
+ this.state.total_body_received += remainder.len;
+
if (this.progress_node) |progress| {
progress.activate();
- progress.setCompletedItems(buffer.list.items.len);
+ progress.setCompletedItems(this.state.total_body_received);
progress.context.maybeRefresh();
}
- if (buffer.list.items.len == this.state.body_size) {
- try this.state.processBodyBuffer(buffer.*);
+ // done or streaming
+ const is_done = this.state.total_body_received >= content_length;
+ if (is_done or this.enable_body_stream.load(.Acquire)) {
+ const processed = try this.state.processBodyBuffer(buffer.*);
if (this.progress_node) |progress| {
progress.activate();
- progress.setCompletedItems(buffer.list.items.len);
+ progress.setCompletedItems(this.state.total_body_received);
progress.context.maybeRefresh();
}
- return true;
+ return is_done or processed > 0;
}
-
return false;
}
@@ -2676,6 +2831,8 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets(
&bytes_decoded,
);
buffer.list.items.len -|= incoming_data.len - bytes_decoded;
+ this.state.total_body_received += bytes_decoded;
+
buffer_.* = buffer;
switch (pret) {
@@ -2690,12 +2847,18 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets(
progress.setCompletedItems(buffer.list.items.len);
progress.context.maybeRefresh();
}
+ // streaming chunks
+ if (this.enable_body_stream.load(.Acquire)) {
+ const processed = try this.state.processBodyBuffer(buffer);
+ return processed > 0;
+ }
return false;
},
// Done
else => {
- try this.state.processBodyBuffer(
+ this.state.received_last_chunk = true;
+ _ = try this.state.processBodyBuffer(
buffer,
);
@@ -2746,6 +2909,7 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket(
&bytes_decoded,
);
buffer.len -|= incoming_data.len - bytes_decoded;
+ this.state.total_body_received += bytes_decoded;
switch (pret) {
// Invalid HTTP response body
@@ -2759,12 +2923,21 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket(
progress.setCompletedItems(buffer.len);
progress.context.maybeRefresh();
}
- try this.state.getBodyBuffer().appendSliceExact(buffer);
+ const body_buffer = this.state.getBodyBuffer();
+ try body_buffer.appendSliceExact(buffer);
+
+ // streaming chunks
+ if (this.enable_body_stream.load(.Acquire)) {
+ const processed = try this.state.processBodyBuffer(body_buffer.*);
+ return processed > 0;
+ }
return false;
},
// Done
else => {
+ this.state.received_last_chunk = true;
+
try this.handleResponseBodyFromSinglePacket(buffer);
std.debug.assert(this.state.body_out_str.?.list.items.ptr != buffer.ptr);
if (this.progress_node) |progress| {
@@ -2790,8 +2963,13 @@ pub fn handleResponseMetadata(
for (response.headers, 0..) |header, header_i| {
switch (hashHeaderName(header.name)) {
hashHeaderConst("Content-Length") => {
- const content_length = std.fmt.parseInt(@TypeOf(this.state.body_size), header.value, 10) catch 0;
- this.state.body_size = content_length;
+ const content_length = std.fmt.parseInt(usize, header.value, 10) catch 0;
+ if (this.method.hasBody()) {
+ this.state.content_length = content_length;
+ } else {
+ // ignore body size for HEAD requests
+ this.state.content_length = content_length;
+ }
},
hashHeaderConst("Content-Encoding") => {
if (strings.eqlComptime(header.value, "gzip")) {
@@ -2968,6 +3146,7 @@ pub fn handleResponseMetadata(
// if is no redirect or if is redirect == "manual" just proceed
this.state.response_stage = if (this.state.transfer_encoding == .chunked) .body_chunk else .body;
+ const content_length = this.state.content_length orelse 0;
// if no body is expected we should stop processing
- return this.method.hasBody() and (this.state.body_size > 0 or this.state.transfer_encoding == .chunked);
+ return this.method.hasBody() and (content_length > 0 or this.state.transfer_encoding == .chunked);
}
diff --git a/src/zlib.zig b/src/zlib.zig
index d79965a2a..b578f0ede 100644
--- a/src/zlib.zig
+++ b/src/zlib.zig
@@ -3,6 +3,8 @@
const std = @import("std");
const bun = @import("root").bun;
+pub const MAX_WBITS = 15;
+
test "Zlib Read" {
const expected_text = @embedFile("./zlib.test.txt");
const input = bun.asByteSlice(@embedFile("./zlib.test.gz"));
@@ -525,7 +527,11 @@ pub const ZlibReaderArrayList = struct {
pub fn readAll(this: *ZlibReader) ZlibError!void {
defer {
- this.list.shrinkRetainingCapacity(this.zlib.total_out);
+ if (this.list.items.len > this.zlib.total_out) {
+ this.list.shrinkRetainingCapacity(this.zlib.total_out);
+ } else if (this.zlib.total_out < this.list.capacity) {
+ this.list.items.len = this.zlib.total_out;
+ }
this.list_ptr.* = this.list;
}
diff --git a/test/js/web/fetch/fetch-gzip.test.ts b/test/js/web/fetch/fetch-gzip.test.ts
index 32888947b..0569eaad8 100644
--- a/test/js/web/fetch/fetch-gzip.test.ts
+++ b/test/js/web/fetch/fetch-gzip.test.ts
@@ -86,7 +86,7 @@ it("fetch() with a protocol-relative redirect that returns a buffered gzip respo
server.stop();
});
-it("fetch() with a gzip response works (one chunk, streamed, with a delay", async () => {
+it("fetch() with a gzip response works (one chunk, streamed, with a delay)", async () => {
var server = Bun.serve({
port: 0,
@@ -121,7 +121,7 @@ it("fetch() with a gzip response works (one chunk, streamed, with a delay", asyn
server.stop();
});
-it("fetch() with a gzip response works (multiple chunks, TCP server", async done => {
+it("fetch() with a gzip response works (multiple chunks, TCP server)", async done => {
const compressed = await Bun.file(import.meta.dir + "/fixture.html.gz").arrayBuffer();
var socketToClose!: Socket;
const server = Bun.listen({
diff --git a/test/js/web/fetch/fetch.stream.test.ts b/test/js/web/fetch/fetch.stream.test.ts
new file mode 100644
index 000000000..efef6a161
--- /dev/null
+++ b/test/js/web/fetch/fetch.stream.test.ts
@@ -0,0 +1,1129 @@
+import { Socket, Server, TCPSocketListener } from "bun";
+import { readFileSync } from "fs";
+import { join } from "path";
+import { describe, expect, it } from "bun:test";
+import { gcTick } from "harness";
+
+const fixtures = {
+ "fixture": readFileSync(join(import.meta.dir, "fixture.html")),
+ "fixture.png": readFileSync(join(import.meta.dir, "fixture.png")),
+ "fixture.png.gz": readFileSync(join(import.meta.dir, "fixture.png.gz")),
+};
+
+const invalid = Buffer.from([0xc0]);
+
+const bigText = Buffer.from("a".repeat(1 * 1024 * 1024));
+const smallText = Buffer.from("Hello".repeat(16));
+const empty = Buffer.alloc(0);
+
+describe("fetch() with streaming", () => {
+ it("stream still works after response get out of scope", async () => {
+ let server: Server | null = null;
+ try {
+ const content = "Hello, world!\n".repeat(5);
+ server = Bun.serve({
+ port: 0,
+ fetch(req) {
+ return new Response(
+ new ReadableStream({
+ type: "direct",
+ async pull(controller) {
+ const data = Buffer.from(content, "utf8");
+ const size = data.byteLength / 5;
+ controller.write(data.slice(0, size));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size, size * 2));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 2, size * 3));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 3, size * 5));
+ await controller.flush();
+
+ controller.close();
+ },
+ }),
+ { status: 200, headers: { "Content-Type": "text/plain" } },
+ );
+ },
+ });
+
+ async function getReader() {
+ return (await fetch(`http://${server.hostname}:${server.port}`, {})).body?.getReader();
+ }
+ gcTick(false);
+ const reader = await getReader();
+ gcTick(false);
+ let buffer = Buffer.alloc(0);
+ let parts = 0;
+ while (true) {
+ gcTick(false);
+
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ }
+ parts++;
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(buffer.toString("utf8")).toBe(content);
+ expect(parts).toBeGreaterThan(1);
+ } finally {
+ server?.stop();
+ }
+ });
+
+ it("response inspected size should reflect stream state", async () => {
+ let server: Server | null = null;
+ try {
+ const content = "Bun!\n".repeat(4);
+ server = Bun.serve({
+ port: 0,
+ fetch(req) {
+ return new Response(
+ new ReadableStream({
+ type: "direct",
+ async pull(controller) {
+ const data = Buffer.from(content, "utf8");
+ const size = data.byteLength / 5;
+ controller.write(data.slice(0, size));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size, size * 2));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 2, size * 3));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 3, size * 4));
+ await controller.flush();
+
+ controller.close();
+ },
+ }),
+ { status: 200, headers: { "Content-Type": "text/plain" } },
+ );
+ },
+ });
+
+ function inspectBytes(response: Response) {
+ const match = /Response \(([0-9]+ )bytes\)/g.exec(
+ Bun.inspect(response, {
+ depth: 0,
+ }),
+ );
+ if (!match) return 0;
+ return parseInt(match[1]?.trim(), 10);
+ }
+
+ const res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+ const reader = res.body?.getReader();
+ gcTick(false);
+ let size = 0;
+ while (true) {
+ gcTick(false);
+
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ size += value.length;
+ }
+ expect(inspectBytes(res)).toBe(size);
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ } finally {
+ server?.stop();
+ }
+ });
+
+ it("can handle multiple simultaneos requests", async () => {
+ let server: Server | null = null;
+ try {
+ const content = "Hello, world!\n".repeat(5);
+ server = Bun.serve({
+ port: 0,
+ fetch(req) {
+ return new Response(
+ new ReadableStream({
+ type: "direct",
+ async pull(controller) {
+ const data = Buffer.from(content, "utf8");
+ const size = data.byteLength / 5;
+ controller.write(data.slice(0, size));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size, size * 2));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 2, size * 3));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 3, size * 5));
+ await controller.flush();
+
+ controller.close();
+ },
+ }),
+ {
+ status: 200,
+ headers: {
+ "Content-Type": "text/plain",
+ },
+ },
+ );
+ },
+ });
+
+ const server_url = `http://${server.hostname}:${server.port}`;
+ async function doRequest() {
+ await Bun.sleep(10);
+ const res = await fetch(server_url);
+ const reader = res.body?.getReader();
+ let buffer = Buffer.alloc(0);
+ let parts = 0;
+ while (true) {
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ parts++;
+ }
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(buffer.toString("utf8")).toBe(content);
+ expect(parts).toBeGreaterThan(1);
+ }
+
+ await Promise.all([doRequest(), doRequest(), doRequest(), doRequest(), doRequest(), doRequest()]);
+ } finally {
+ server?.stop();
+ }
+ });
+
+ it(`can handle transforms`, async () => {
+ let server: Server | null = null;
+ try {
+ const content = "Hello, world!\n".repeat(5);
+ server = Bun.serve({
+ port: 0,
+ fetch(req) {
+ return new Response(
+ new ReadableStream({
+ type: "direct",
+ async pull(controller) {
+ const data = Buffer.from(content, "utf8");
+ const size = data.byteLength / 5;
+ controller.write(data.slice(0, size));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size, size * 2));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 2, size * 3));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 3, size * 5));
+ await controller.flush();
+
+ controller.close();
+ },
+ }),
+ {
+ status: 200,
+ headers: {
+ "Content-Type": "text/plain",
+ },
+ },
+ );
+ },
+ });
+
+ const server_url = `http://${server.hostname}:${server.port}`;
+ const res = await fetch(server_url);
+
+ const transform = new TransformStream({
+ transform(chunk, controller) {
+ controller.enqueue(Buffer.from(chunk).toString("utf8").toUpperCase());
+ },
+ });
+
+ const reader = res.body?.pipeThrough(transform).getReader();
+
+ let result = "";
+ while (true) {
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ result += value;
+ }
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(result).toBe(content.toUpperCase());
+ } finally {
+ server?.stop();
+ }
+ });
+
+ it(`can handle gz images`, async () => {
+ let server: Server | null = null;
+ try {
+ server = Bun.serve({
+ port: 0,
+ fetch(req) {
+ const data = fixtures["fixture.png.gz"];
+ return new Response(data, {
+ status: 200,
+ headers: {
+ "Content-Type": "text/plain",
+ "Content-Encoding": "gzip",
+ },
+ });
+ },
+ });
+
+ const server_url = `http://${server.hostname}:${server.port}`;
+ const res = await fetch(server_url);
+
+ const reader = res.body?.getReader();
+
+ let buffer = Buffer.alloc(0);
+ while (true) {
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ }
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(buffer).toEqual(fixtures["fixture.png"]);
+ } finally {
+ server?.stop();
+ }
+ });
+
+ it(`can proxy fetch with Bun.serve`, async () => {
+ let server: Server | null = null;
+ let server_original: Server | null = null;
+ try {
+ const content = "a".repeat(64 * 1024);
+
+ server_original = Bun.serve({
+ port: 0,
+ fetch(req) {
+ return new Response(
+ new ReadableStream({
+ type: "direct",
+ async pull(controller) {
+ const data = Buffer.from(content, "utf8");
+ const size = data.byteLength / 5;
+ controller.write(data.slice(0, size));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size, size * 2));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 2, size * 3));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 3, size * 5));
+ await controller.flush();
+
+ controller.close();
+ },
+ }),
+ {
+ status: 200,
+ headers: {
+ "Content-Type": "text/plain",
+ },
+ },
+ );
+ },
+ });
+
+ server = Bun.serve({
+ port: 0,
+ async fetch(req) {
+ const response = await fetch(`http://${server_original.hostname}:${server_original.port}`, {});
+ await Bun.sleep(10);
+ return new Response(response.body, {
+ status: 200,
+ headers: {
+ "Content-Type": "text/plain",
+ },
+ });
+ },
+ });
+
+ let res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+ const reader = res.body?.getReader();
+
+ let buffer = Buffer.alloc(0);
+ let parts = 0;
+ while (true) {
+ gcTick(false);
+
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ parts++;
+ }
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(buffer.toString("utf8")).toBe(content);
+ expect(parts).toBeGreaterThanOrEqual(1);
+ } finally {
+ server?.stop();
+ server_original?.stop();
+ }
+ });
+ const matrix = [
+ { name: "small", data: fixtures["fixture"] },
+ { name: "small text", data: smallText },
+ { name: "big text", data: bigText },
+ { name: "img", data: fixtures["fixture.png"] },
+ { name: "empty", data: empty },
+ ];
+ for (let i = 0; i < matrix.length; i++) {
+ const fixture = matrix[i];
+ for (let j = 0; j < matrix.length; j++) {
+ const fixtureb = matrix[j];
+ const test = fixture.name == "empty" && fixtureb.name == "empty" ? it.todo : it;
+ test(`can handle fixture ${fixture.name} x ${fixtureb.name}`, async () => {
+ let server: Server | null = null;
+ try {
+ //@ts-ignore
+ const data = fixture.data;
+ //@ts-ignore
+ const data_b = fixtureb.data;
+ const content = Buffer.concat([data, data_b]);
+ server = Bun.serve({
+ port: 0,
+ fetch(req) {
+ return new Response(
+ new ReadableStream({
+ type: "direct",
+ async pull(controller) {
+ controller.write(data);
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data_b);
+ await controller.flush();
+ controller.close();
+ },
+ }),
+ {
+ status: 200,
+ headers: {
+ "Content-Type": "text/plain",
+ },
+ },
+ );
+ },
+ });
+
+ const server_url = `http://${server.hostname}:${server.port}`;
+ const res = await fetch(server_url);
+ const reader = res.body?.getReader();
+ let buffer = Buffer.alloc(0);
+ while (true) {
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ }
+ if (done) {
+ break;
+ }
+ }
+ gcTick(false);
+ expect(buffer).toEqual(content);
+ } finally {
+ server?.stop();
+ }
+ });
+ }
+ }
+
+ type CompressionType = "no" | "gzip" | "deflate" | "br";
+ type TestType = { headers: Record<string, string>; compression: CompressionType; skip?: boolean };
+ const types: Array<TestType> = [
+ { headers: {}, compression: "no" },
+ { headers: { "Content-Encoding": "gzip" }, compression: "gzip" },
+ { headers: { "Content-Encoding": "deflate" }, compression: "deflate" },
+ // { headers: { "Content-Encoding": "br" }, compression: "br", skip: true }, // not implemented yet
+ ];
+
+ function compress(compression: CompressionType, data: Uint8Array) {
+ switch (compression) {
+ case "gzip":
+ return Bun.gzipSync(data);
+ case "deflate":
+ return Bun.deflateSync(data);
+ default:
+ return data;
+ }
+ }
+
+ for (const { headers, compression, skip } of types) {
+ const test = skip ? it.skip : it;
+
+ test(`with invalid utf8 with ${compression} compression`, async () => {
+ let server: Server | null = null;
+ try {
+ const content = Buffer.concat([invalid, Buffer.from("Hello, world!\n".repeat(5), "utf8"), invalid]);
+ server = Bun.serve({
+ port: 0,
+ fetch(req) {
+ return new Response(
+ new ReadableStream({
+ type: "direct",
+ async pull(controller) {
+ const data = compress(compression, content);
+ const size = data.byteLength / 4;
+ controller.write(data.slice(0, size));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size, size * 2));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 2, size * 3));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 3, size * 4));
+ await controller.flush();
+
+ controller.close();
+ },
+ }),
+ {
+ status: 200,
+ headers: {
+ "Content-Type": "text/plain",
+ ...headers,
+ },
+ },
+ );
+ },
+ });
+
+ let res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+ const reader = res.body?.getReader();
+
+ let buffer = Buffer.alloc(0);
+ while (true) {
+ gcTick(false);
+
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ }
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(buffer).toEqual(content);
+ } finally {
+ server?.stop();
+ }
+ });
+
+ test(`chunked response works (single chunk) with ${compression} compression`, async () => {
+ let server: Server | null = null;
+ try {
+ const content = "Hello, world!\n".repeat(5);
+ server = Bun.serve({
+ port: 0,
+ fetch(req) {
+ return new Response(
+ new ReadableStream({
+ type: "direct",
+ async pull(controller) {
+ const data = compress(compression, Buffer.from(content, "utf8"));
+ controller.write(data);
+ await controller.flush();
+ controller.close();
+ },
+ }),
+ {
+ status: 200,
+ headers: {
+ "Content-Type": "text/plain",
+ ...headers,
+ },
+ },
+ );
+ },
+ });
+ let res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+ const result = await res.text();
+ gcTick(false);
+ expect(result).toBe(content);
+
+ res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+ const reader = res.body?.getReader();
+
+ let buffer = Buffer.alloc(0);
+ let parts = 0;
+ while (true) {
+ gcTick(false);
+
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ parts++;
+ }
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(buffer.toString("utf8")).toBe(content);
+ expect(parts).toBe(1);
+ } finally {
+ server?.stop();
+ }
+ });
+
+ test(`chunked response works (multiple chunks) with ${compression} compression`, async () => {
+ let server: Server | null = null;
+ try {
+ const content = "Hello, world!\n".repeat(5);
+ server = Bun.serve({
+ port: 0,
+ fetch(req) {
+ return new Response(
+ new ReadableStream({
+ type: "direct",
+ async pull(controller) {
+ const data = compress(compression, Buffer.from(content, "utf8"));
+ const size = data.byteLength / 5;
+ controller.write(data.slice(0, size));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size, size * 2));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 2, size * 3));
+ await controller.flush();
+ await Bun.sleep(100);
+ controller.write(data.slice(size * 3, size * 5));
+ await controller.flush();
+
+ controller.close();
+ },
+ }),
+ {
+ status: 200,
+ headers: {
+ "Content-Type": "text/plain",
+ ...headers,
+ },
+ },
+ );
+ },
+ });
+ let res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+ const result = await res.text();
+ gcTick(false);
+ expect(result).toBe(content);
+
+ res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+ const reader = res.body?.getReader();
+
+ let buffer = Buffer.alloc(0);
+ let parts = 0;
+ while (true) {
+ gcTick(false);
+
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ }
+ parts++;
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(buffer.toString("utf8")).toBe(content);
+ expect(parts).toBeGreaterThan(1);
+ } finally {
+ server?.stop();
+ }
+ });
+
+ test(`Content-Length response works (single part) with ${compression} compression`, async () => {
+ let server: Server | null = null;
+ try {
+ const content = "a".repeat(1024);
+
+ server = Bun.serve({
+ port: 0,
+ fetch(req) {
+ return new Response(compress(compression, Buffer.from(content)), {
+ status: 200,
+ headers: {
+ "Content-Type": "text/plain",
+ ...headers,
+ },
+ });
+ },
+ });
+ let res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+ const result = await res.text();
+ gcTick(false);
+ expect(result).toBe(content);
+
+ res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+ const reader = res.body?.getReader();
+
+ let buffer = Buffer.alloc(0);
+ let parts = 0;
+ while (true) {
+ gcTick(false);
+
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ parts++;
+ }
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(buffer.toString("utf8")).toBe(content);
+ expect(parts).toBe(1);
+ } finally {
+ server?.stop();
+ }
+ });
+
+ test(`Content-Length response works (multiple parts) with ${compression} compression`, async () => {
+ let server: Server | null = null;
+ try {
+ const content = "a".repeat(64 * 1024);
+
+ server = Bun.serve({
+ port: 0,
+ fetch(req) {
+ return new Response(compress(compression, Buffer.from(content)), {
+ status: 200,
+ headers: {
+ "Content-Type": "text/plain",
+ ...headers,
+ },
+ });
+ },
+ });
+ let res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+ const result = await res.text();
+ gcTick(false);
+ expect(result).toBe(content);
+
+ res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+ const reader = res.body?.getReader();
+
+ let buffer = Buffer.alloc(0);
+ let parts = 0;
+ while (true) {
+ gcTick(false);
+
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ parts++;
+ }
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(buffer.toString("utf8")).toBe(content);
+ expect(parts).toBeGreaterThan(1);
+ } finally {
+ server?.stop();
+ }
+ });
+
+ test(`Extra data should be ignored on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => {
+ let server: TCPSocketListener<any> | null = null;
+
+ try {
+ const parts = 5;
+ const content = "Hello".repeat(parts);
+
+ server = Bun.listen({
+ port: 0,
+ hostname: "0.0.0.0",
+ socket: {
+ async open(socket) {
+ var corked: any[] = [];
+ var cork = true;
+ async function write(chunk: any) {
+ await new Promise<void>((resolve, reject) => {
+ if (cork) {
+ corked.push(chunk);
+ }
+
+ if (!cork && corked.length) {
+ socket.write(corked.join(""));
+ corked.length = 0;
+ socket.flush();
+ }
+
+ if (!cork) {
+ socket.write(chunk);
+ socket.flush();
+ }
+
+ resolve();
+ });
+ }
+ const compressed = compress(compression, Buffer.from(content, "utf8"));
+ await write("HTTP/1.1 200 OK\r\n");
+ await write("Content-Type: text/plain\r\n");
+ for (const [key, value] of Object.entries(headers)) {
+ await write(key + ": " + value + "\r\n");
+ }
+ await write("Content-Length: " + compressed.byteLength + "\r\n");
+ await write("\r\n");
+ const size = compressed.byteLength / 5;
+ for (var i = 0; i < 5; i++) {
+ cork = false;
+ await write(compressed.slice(size * i, size * (i + 1)));
+ }
+ await write("Extra Data!");
+ await write("Extra Data!");
+ socket.flush();
+ },
+ drain(socket) {},
+ },
+ });
+
+ const res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+ const reader = res.body?.getReader();
+
+ let buffer = Buffer.alloc(0);
+ while (true) {
+ gcTick(false);
+
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ }
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(buffer.toString("utf8")).toBe(content);
+ } finally {
+ server?.stop(true);
+ }
+ });
+
+ test(`Missing data should timeout on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => {
+ let server: TCPSocketListener<any> | null = null;
+
+ try {
+ const parts = 5;
+ const content = "Hello".repeat(parts);
+
+ server = Bun.listen({
+ port: 0,
+ hostname: "0.0.0.0",
+ socket: {
+ async open(socket) {
+ var corked: any[] = [];
+ var cork = true;
+ async function write(chunk: any) {
+ await new Promise<void>((resolve, reject) => {
+ if (cork) {
+ corked.push(chunk);
+ }
+
+ if (!cork && corked.length) {
+ socket.write(corked.join(""));
+ corked.length = 0;
+ socket.flush();
+ }
+
+ if (!cork) {
+ socket.write(chunk);
+ socket.flush();
+ }
+
+ resolve();
+ });
+ }
+ const compressed = compress(compression, Buffer.from(content, "utf8"));
+ await write("HTTP/1.1 200 OK\r\n");
+ await write("Content-Type: text/plain\r\n");
+ for (const [key, value] of Object.entries(headers)) {
+ await write(key + ": " + value + "\r\n");
+ }
+ // 10 extra missing bytes that we will never sent
+ await write("Content-Length: " + compressed.byteLength + 10 + "\r\n");
+ await write("\r\n");
+ const size = compressed.byteLength / 5;
+ for (var i = 0; i < 5; i++) {
+ cork = false;
+ await write(compressed.slice(size * i, size * (i + 1)));
+ }
+ socket.flush();
+ },
+ drain(socket) {},
+ },
+ });
+
+ const res = await fetch(`http://${server.hostname}:${server.port}`, {
+ signal: AbortSignal.timeout(1000),
+ });
+ gcTick(false);
+ try {
+ const reader = res.body?.getReader();
+
+ let buffer = Buffer.alloc(0);
+ while (true) {
+ gcTick(false);
+
+ const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>;
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ }
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(buffer.toString("utf8")).toBe("unreachable");
+ } catch (err) {
+ expect((err as Error).name).toBe("TimeoutError");
+ }
+ } finally {
+ server?.stop(true);
+ }
+ });
+
+ if (compression !== "no") {
+ test(`can handle corrupted ${compression} compression`, async () => {
+ let server: TCPSocketListener<any> | null = null;
+
+ try {
+ const parts = 5;
+ const content = "Hello".repeat(parts);
+ server = Bun.listen({
+ port: 0,
+ hostname: "0.0.0.0",
+ socket: {
+ async open(socket) {
+ var corked: any[] = [];
+ var cork = true;
+ async function write(chunk: any) {
+ await new Promise<void>((resolve, reject) => {
+ if (cork) {
+ corked.push(chunk);
+ }
+
+ if (!cork && corked.length) {
+ socket.write(corked.join(""));
+ corked.length = 0;
+ socket.flush();
+ }
+
+ if (!cork) {
+ socket.write(chunk);
+ socket.flush();
+ }
+
+ resolve();
+ });
+ }
+ const compressed = compress(compression, Buffer.from(content, "utf8"));
+ await write("HTTP/1.1 200 OK\r\n");
+ await write("Content-Type: text/plain\r\n");
+ for (const [key, value] of Object.entries(headers)) {
+ await write(key + ": " + value + "\r\n");
+ }
+ // 10 extra missing bytes that we will never sent in this case we will wait to close
+ await write("Content-Length: " + compressed.byteLength + "\r\n");
+ await write("\r\n");
+ const size = compressed.byteLength / 5;
+ compressed[0] = 0; // corrupt data
+ cork = false;
+ for (var i = 0; i < 5; i++) {
+ await write(compressed.slice(size * i, size * (i + 1)));
+ }
+ socket.flush();
+ },
+ drain(socket) {},
+ },
+ });
+
+ const res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+
+ try {
+ const reader = res.body?.getReader();
+
+ let buffer = Buffer.alloc(0);
+
+ while (true) {
+ gcTick(false);
+ const read_promise = reader?.read();
+ const { done, value } = (await read_promise) as ReadableStreamDefaultReadResult<any>;
+
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ }
+
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(buffer.toString("utf8")).toBe("unreachable");
+ } catch (err) {
+ expect((err as Error).name).toBe("ZlibError");
+ }
+ } finally {
+ server?.stop(true);
+ }
+ });
+ }
+
+ test(`can handle socket close with ${compression} compression`, async () => {
+ let server: TCPSocketListener<any> | null = null;
+
+ try {
+ const parts = 5;
+ const content = "Hello".repeat(parts);
+ const { promise, resolve: resolveSocket } = Promise.withResolvers<Socket>();
+ server = Bun.listen({
+ port: 0,
+ hostname: "0.0.0.0",
+ socket: {
+ async open(socket) {
+ var corked: any[] = [];
+ var cork = true;
+ async function write(chunk: any) {
+ await new Promise<void>((resolve, reject) => {
+ if (cork) {
+ corked.push(chunk);
+ }
+
+ if (!cork && corked.length) {
+ socket.write(corked.join(""));
+ corked.length = 0;
+ socket.flush();
+ }
+
+ if (!cork) {
+ socket.write(chunk);
+ socket.flush();
+ }
+
+ resolve();
+ });
+ }
+ const compressed = compress(compression, Buffer.from(content, "utf8"));
+ await write("HTTP/1.1 200 OK\r\n");
+ await write("Content-Type: text/plain\r\n");
+ for (const [key, value] of Object.entries(headers)) {
+ await write(key + ": " + value + "\r\n");
+ }
+ // 10 extra missing bytes that we will never sent in this case we will wait to close
+ await write("Content-Length: " + compressed.byteLength + 10 + "\r\n");
+ await write("\r\n");
+ const size = compressed.byteLength / 5;
+ for (var i = 0; i < 5; i++) {
+ cork = false;
+ await write(compressed.slice(size * i, size * (i + 1)));
+ }
+ socket.flush();
+ resolveSocket(socket);
+ },
+ drain(socket) {},
+ },
+ });
+
+ const res = await fetch(`http://${server.hostname}:${server.port}`, {});
+ gcTick(false);
+
+ let socket: Socket | null = await promise;
+ try {
+ const reader = res.body?.getReader();
+
+ let buffer = Buffer.alloc(0);
+
+ while (true) {
+ gcTick(false);
+ const read_promise = reader?.read();
+ socket?.end();
+ socket = null;
+ const { done, value } = (await read_promise) as ReadableStreamDefaultReadResult<any>;
+
+ if (value) {
+ buffer = Buffer.concat([buffer, value]);
+ }
+
+ if (done) {
+ break;
+ }
+ }
+
+ gcTick(false);
+ expect(buffer.toString("utf8")).toBe("unreachable");
+ } catch (err) {
+ expect((err as Error).name).toBe("ConnectionClosed");
+ }
+ } finally {
+ server?.stop(true);
+ }
+ });
+ }
+});
diff --git a/test/js/web/fetch/fetch.test.ts b/test/js/web/fetch/fetch.test.ts
index a381cb320..59847dde9 100644
--- a/test/js/web/fetch/fetch.test.ts
+++ b/test/js/web/fetch/fetch.test.ts
@@ -4,7 +4,7 @@ import { chmodSync, mkdtempSync, readFileSync, realpathSync, rmSync, writeFileSy
import { mkfifo } from "mkfifo";
import { tmpdir } from "os";
import { join } from "path";
-import { gc, withoutAggressiveGC } from "harness";
+import { gc, withoutAggressiveGC, gcTick } from "harness";
const tmp_dir = mkdtempSync(join(realpathSync(tmpdir()), "fetch.test"));
@@ -1334,11 +1334,18 @@ it("fetch() file:// works", async () => {
expect(await (await fetch(new URL("fetch.test.ts", import.meta.url))).text()).toEqual(
await Bun.file(Bun.fileURLToPath(new URL("fetch.test.ts", import.meta.url))).text(),
);
- expect(await (await fetch(new URL("file with space in the name.txt", import.meta.url))).text()).toEqual(
- await Bun.file(Bun.fileURLToPath(new URL("file with space in the name.txt", import.meta.url))).text(),
- );
+ gc(true);
+ var fileResponse = await fetch(new URL("file with space in the name.txt", import.meta.url));
+ gc(true);
+ var fileResponseText = await fileResponse.text();
+ gc(true);
+ var bunFile = Bun.file(Bun.fileURLToPath(new URL("file with space in the name.txt", import.meta.url)));
+ gc(true);
+ var bunFileText = await bunFile.text();
+ gc(true);
+ expect(fileResponseText).toEqual(bunFileText);
+ gc(true);
});
-
it("cloned response headers are independent before accessing", () => {
const response = new Response("hello", {
headers: {
diff --git a/test/js/web/fetch/fixture.png b/test/js/web/fetch/fixture.png
new file mode 100644
index 000000000..b34dbd29d
--- /dev/null
+++ b/test/js/web/fetch/fixture.png
Binary files differ
diff --git a/test/js/web/fetch/fixture.png.gz b/test/js/web/fetch/fixture.png.gz
new file mode 100644
index 000000000..8c0071811
--- /dev/null
+++ b/test/js/web/fetch/fixture.png.gz
Binary files differ