diff options
-rw-r--r-- | misctools/fetch.zig | 2 | ||||
-rw-r--r-- | src/bun.js/api/bun/subprocess.zig | 6 | ||||
-rw-r--r-- | src/bun.js/bindings/bindings.cpp | 2 | ||||
-rw-r--r-- | src/bun.js/event_loop.zig | 3 | ||||
-rw-r--r-- | src/bun.js/webcore/blob.zig | 20 | ||||
-rw-r--r-- | src/bun.js/webcore/body.zig | 18 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 260 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 89 | ||||
-rw-r--r-- | src/cli/create_command.zig | 56 | ||||
-rw-r--r-- | src/cli/upgrade_command.zig | 28 | ||||
-rw-r--r-- | src/deps/uws.zig | 16 | ||||
-rw-r--r-- | src/http/zlib.zig | 1 | ||||
-rw-r--r-- | src/http_client_async.zig | 373 | ||||
-rw-r--r-- | src/zlib.zig | 8 | ||||
-rw-r--r-- | test/js/web/fetch/fetch-gzip.test.ts | 4 | ||||
-rw-r--r-- | test/js/web/fetch/fetch.stream.test.ts | 1129 | ||||
-rw-r--r-- | test/js/web/fetch/fetch.test.ts | 17 | ||||
-rw-r--r-- | test/js/web/fetch/fixture.png | bin | 0 -> 82022 bytes | |||
-rw-r--r-- | test/js/web/fetch/fixture.png.gz | bin | 0 -> 78116 bytes |
19 files changed, 1876 insertions, 156 deletions
diff --git a/misctools/fetch.zig b/misctools/fetch.zig index e450ab5d3..bb9e09a2a 100644 --- a/misctools/fetch.zig +++ b/misctools/fetch.zig @@ -187,7 +187,7 @@ pub fn main() anyerror!void { var ctx = try default_allocator.create(HTTP.HTTPChannelContext); ctx.* = .{ .channel = channel, - .http = try HTTP.AsyncHTTP.init(default_allocator, args.method, args.url, args.headers, args.headers_buf, response_body_string, args.body, 0, HTTP.FetchRedirect.follow), + .http = try HTTP.AsyncHTTP.init(default_allocator, args.method, args.url, args.headers, args.headers_buf, response_body_string, args.body, 0, HTTP.FetchRedirect.follow,), }; ctx.http.callback = HTTP.HTTPChannelContext.callback; var batch = HTTPThread.Batch{}; diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 4fb02b8af..b8bf6939c 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -616,7 +616,11 @@ pub const Subprocess = struct { return; }, .err => |err| { - this.status = .{ .err = err }; + if (err == .Error) { + this.status = .{ .err = err.Error }; + } else { + this.status = .{ .err = JSC.Node.Syscall.Error.fromCode(.CANCELED, .read) }; + } this.fifo.close(); return; diff --git a/src/bun.js/bindings/bindings.cpp b/src/bun.js/bindings/bindings.cpp index 37594288d..badfd3437 100644 --- a/src/bun.js/bindings/bindings.cpp +++ b/src/bun.js/bindings/bindings.cpp @@ -1181,7 +1181,7 @@ WebCore::FetchHeaders* WebCore__FetchHeaders__createFromPicoHeaders_(const void* for (size_t j = 0; j < end; j++) { PicoHTTPHeader header = pico_headers.ptr[j]; - if (header.value.len == 0) + if (header.value.len == 0 || header.name.len == 0) continue; StringView nameView = StringView(reinterpret_cast<const char*>(header.name.ptr), header.name.len); diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 92613d0f0..92874b6a4 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -545,8 +545,7 @@ pub const EventLoop = struct { }, .FetchTasklet => { var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?; - fetch_task.onDone(); - fetch_task.deinit(); + fetch_task.onProgressUpdate(); }, @field(Task.Tag, @typeName(AsyncTransformTask)) => { var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?; diff --git a/src/bun.js/webcore/blob.zig b/src/bun.js/webcore/blob.zig index df2e17ce4..c794ab59b 100644 --- a/src/bun.js/webcore/blob.zig +++ b/src/bun.js/webcore/blob.zig @@ -3794,6 +3794,10 @@ pub const Blob = struct { } else { return build.blob.dupe(); } + } else if (current.toSliceClone(global)) |sliced| { + if (sliced.allocator.get()) |allocator| { + return Blob.initWithAllASCII(bun.constStrToU8(sliced.slice()), allocator, global, false); + } } }, @@ -3886,6 +3890,14 @@ pub const Blob = struct { could_have_non_ascii = could_have_non_ascii or !(blob.is_all_ascii orelse false); joiner.append(blob.sharedView(), 0, null); continue; + } else if (current.toSliceClone(global)) |sliced| { + const allocator = sliced.allocator.get(); + could_have_non_ascii = could_have_non_ascii or allocator != null; + joiner.append( + sliced.slice(), + 0, + allocator, + ); } }, else => {}, @@ -3900,6 +3912,14 @@ pub const Blob = struct { if (current.as(Blob)) |blob| { could_have_non_ascii = could_have_non_ascii or !(blob.is_all_ascii orelse false); joiner.append(blob.sharedView(), 0, null); + } else if (current.toSliceClone(global)) |sliced| { + const allocator = sliced.allocator.get(); + could_have_non_ascii = could_have_non_ascii or allocator != null; + joiner.append( + sliced.slice(), + 0, + allocator, + ); } }, diff --git a/src/bun.js/webcore/body.zig b/src/bun.js/webcore/body.zig index b59125bc6..fa0ec9b24 100644 --- a/src/bun.js/webcore/body.zig +++ b/src/bun.js/webcore/body.zig @@ -210,10 +210,24 @@ pub const Body = struct { /// used in HTTP server to ignore request bodies unless asked for it onStartBuffering: ?*const fn (ctx: *anyopaque) void = null, onStartStreaming: ?*const fn (ctx: *anyopaque) JSC.WebCore.DrainResult = null, + size_hint: Blob.SizeType = 0, deinit: bool = false, action: Action = Action{ .none = {} }, + /// For Http Client requests + /// when Content-Length is provided this represents the whole size of the request + /// If chunked encoded this will represent the total received size (ignoring the chunk headers) + /// If the size is unknown will be 0 + fn sizeHint(this: *const PendingValue) Blob.SizeType { + if (this.readable) |readable| { + if (readable.ptr == .Bytes) { + return readable.ptr.Bytes.size_hint; + } + } + return this.size_hint; + } + pub fn toAnyBlob(this: *PendingValue) ?AnyBlob { if (this.promise != null) return null; @@ -375,6 +389,7 @@ pub const Body = struct { .Blob => this.Blob.size, .InternalBlob => @as(Blob.SizeType, @truncate(this.InternalBlob.sliceConst().len)), .WTFStringImpl => @as(Blob.SizeType, @truncate(this.WTFStringImpl.utf8ByteLength())), + .Locked => this.Locked.sizeHint(), // .InlineBlob => @truncate(Blob.SizeType, this.InlineBlob.sliceConst().len), else => 0, }; @@ -382,9 +397,9 @@ pub const Body = struct { pub fn fastSize(this: *const Value) Blob.SizeType { return switch (this.*) { - .Blob => this.Blob.size, .InternalBlob => @as(Blob.SizeType, @truncate(this.InternalBlob.sliceConst().len)), .WTFStringImpl => @as(Blob.SizeType, @truncate(this.WTFStringImpl.byteSlice().len)), + .Locked => this.Locked.sizeHint(), // .InlineBlob => @truncate(Blob.SizeType, this.InlineBlob.sliceConst().len), else => 0, }; @@ -394,6 +409,7 @@ pub const Body = struct { return switch (this.*) { .InternalBlob => this.InternalBlob.sliceConst().len, .WTFStringImpl => this.WTFStringImpl.byteSlice().len, + .Locked => this.Locked.sizeHint(), // .InlineBlob => this.InlineBlob.sliceConst().len, else => 0, }; diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 4b15efc0e..01ecfad36 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -44,6 +44,7 @@ const JSPrinter = bun.js_printer; const picohttp = @import("root").bun.picohttp; const StringJoiner = @import("../../string_joiner.zig"); const uws = @import("root").bun.uws; +const Mutex = @import("../../lock.zig").Lock; const InlineBlob = JSC.WebCore.InlineBlob; const AnyBlob = JSC.WebCore.AnyBlob; @@ -123,6 +124,7 @@ pub const Response = struct { pub fn writeFormat(this: *Response, comptime Formatter: type, formatter: *Formatter, writer: anytype, comptime enable_ansi_colors: bool) !void { const Writer = @TypeOf(writer); try writer.print("Response ({}) {{\n", .{bun.fmt.size(this.body.len())}); + { formatter.indent += 1; defer formatter.indent -|= 1; @@ -618,11 +620,21 @@ pub const Fetch = struct { javascript_vm: *VirtualMachine = undefined, global_this: *JSGlobalObject = undefined, request_body: HTTPRequestBody = undefined, + /// buffer being used by AsyncHTTP response_buffer: MutableString = undefined, + /// buffer used to stream response to JS + scheduled_response_buffer: MutableString = undefined, + /// response strong ref + response: JSC.Strong = .{}, request_headers: Headers = Headers{ .allocator = undefined }, promise: JSC.JSPromise.Strong, concurrent_task: JSC.ConcurrentTask = .{}, poll_ref: JSC.PollRef = .{}, + /// For Http Client requests + /// when Content-Length is provided this represents the whole size of the request + /// If chunked encoded this will represent the total received size (ignoring the chunk headers) + /// If is not chunked encoded and Content-Length is not provided this will be unknown + body_size: HTTPClient.HTTPClientResult.BodySize = .unknown, /// This is url + proxy memory buffer and is owned by FetchTasklet /// We always clone url and proxy (if informed) @@ -630,11 +642,14 @@ pub const Fetch = struct { signal: ?*JSC.WebCore.AbortSignal = null, aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + has_schedule_callback: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), // must be stored because AbortSignal stores reason weakly abort_reason: JSValue = JSValue.zero, // Custom Hostname hostname: ?[]u8 = null, + is_waiting_body: bool = false, + mutex: Mutex, tracker: JSC.AsyncTaskTracker, @@ -691,6 +706,8 @@ pub const Fetch = struct { this.result.deinitMetadata(); this.response_buffer.deinit(); + this.response.deinit(); + this.scheduled_response_buffer.deinit(); this.request_body.detach(); if (this.abort_reason != .zero) @@ -707,9 +724,122 @@ pub const Fetch = struct { this.javascript_vm.allocator.destroy(this); } - pub fn onDone(this: *FetchTasklet) void { + pub fn onBodyReceived(this: *FetchTasklet) void { + const success = this.result.isSuccess(); + const globalThis = this.global_this; + defer { + if (!success or !this.result.has_more) { + var vm = globalThis.bunVM(); + this.poll_ref.unref(vm); + this.clearData(); + this.deinit(); + } + } + + if (!success) { + const err = this.onReject(); + err.ensureStillAlive(); + if (this.response.get()) |response_js| { + if (response_js.as(Response)) |response| { + const body = response.body; + if (body.value == .Locked) { + if (body.value.Locked.readable) |readable| { + readable.ptr.Bytes.onData( + .{ + .err = .{ .JSValue = err }, + }, + bun.default_allocator, + ); + return; + } + } + + response.body.value.toErrorInstance(err, globalThis); + return; + } + } + + globalThis.throwValue(err); + return; + } + + if (this.response.get()) |response_js| { + if (response_js.as(Response)) |response| { + const body = response.body; + if (body.value == .Locked) { + if (body.value.Locked.readable) |readable| { + if (readable.ptr == .Bytes) { + readable.ptr.Bytes.size_hint = this.getSizeHint(); + + var scheduled_response_buffer = this.scheduled_response_buffer.list; + + const chunk = scheduled_response_buffer.items; + + if (this.result.has_more) { + readable.ptr.Bytes.onData( + .{ + .temporary = bun.ByteList.initConst(chunk), + }, + bun.default_allocator, + ); + + // clean for reuse later + this.scheduled_response_buffer.reset(); + } else { + readable.ptr.Bytes.onData( + .{ + .temporary_and_done = bun.ByteList.initConst(chunk), + }, + bun.default_allocator, + ); + } + + return; + } + } else { + response.body.value.Locked.size_hint = this.getSizeHint(); + } + // we will reach here when not streaming + if (!this.result.has_more) { + var scheduled_response_buffer = this.scheduled_response_buffer.list; + + // done resolve body + var old = body.value; + var body_value = Body.Value{ + .InternalBlob = .{ + .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), + }, + }; + response.body.value = body_value; + + this.scheduled_response_buffer = .{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; + + if (old == .Locked) { + old.resolve(&response.body.value, this.global_this); + } + } + } + } + } + } + + pub fn onProgressUpdate(this: *FetchTasklet) void { JSC.markBinding(@src()); + this.mutex.lock(); + defer { + this.has_schedule_callback.store(false, .Monotonic); + this.mutex.unlock(); + } + if (this.is_waiting_body) { + return this.onBodyReceived(); + } const globalThis = this.global_this; var ref = this.promise; @@ -718,13 +848,22 @@ pub const Fetch = struct { var poll_ref = this.poll_ref; var vm = globalThis.bunVM(); - defer poll_ref.unref(vm); if (promise_value.isEmptyOrUndefinedOrNull()) { + poll_ref.unref(vm); this.clearData(); + this.deinit(); return; } + defer { + if (!this.is_waiting_body) { + poll_ref.unref(vm); + this.clearData(); + this.deinit(); + } + } + const promise = promise_value.asAnyPromise().?; const tracker = this.tracker; tracker.willDispatch(globalThis); @@ -735,7 +874,6 @@ pub const Fetch = struct { false => this.onReject(), }; result.ensureStillAlive(); - this.clearData(); promise_value.ensureStillAlive(); @@ -784,25 +922,77 @@ pub const Fetch = struct { return fetch_error.toErrorInstance(this.global_this); } - fn toBodyValue(this: *FetchTasklet) Body.Value { - var response_buffer = this.response_buffer.list; - this.response_buffer = .{ - .allocator = default_allocator, - .list = .{ - .items = &.{}, - .capacity = 0, - }, + pub fn onStartStreamingRequestBodyCallback(ctx: *anyopaque) JSC.WebCore.DrainResult { + const this = bun.cast(*FetchTasklet, ctx); + if (this.http) |http| { + http.enableBodyStreaming(); + } + if (this.aborted.load(.Acquire)) { + return JSC.WebCore.DrainResult{ + .aborted = {}, + }; + } + + this.mutex.lock(); + defer this.mutex.unlock(); + const size_hint = this.getSizeHint(); + + var scheduled_response_buffer = this.scheduled_response_buffer.list; + // This means we have received part of the body but not the whole thing + if (scheduled_response_buffer.items.len > 0) { + this.scheduled_response_buffer = .{ + .allocator = default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; + + return .{ + .owned = .{ + .list = scheduled_response_buffer.toManaged(bun.default_allocator), + .size_hint = size_hint, + }, + }; + } + + return .{ + .estimated_size = size_hint, }; + } + + fn getSizeHint(this: *FetchTasklet) Blob.SizeType { + return switch (this.body_size) { + .content_length => @truncate(this.body_size.content_length), + .total_received => @truncate(this.body_size.total_received), + else => 0, + }; + } - // if (response_buffer.items.len < InlineBlob.available_bytes) { - // const inline_blob = InlineBlob.init(response_buffer.items); - // defer response_buffer.deinit(bun.default_allocator); - // return .{ .InlineBlob = inline_blob }; - // } + fn toBodyValue(this: *FetchTasklet) Body.Value { + if (this.is_waiting_body) { + const response = Body.Value{ + .Locked = .{ + .size_hint = this.getSizeHint(), + .task = this, + .global = this.global_this, + .onStartStreaming = FetchTasklet.onStartStreamingRequestBodyCallback, + }, + }; + return response; + } + var scheduled_response_buffer = this.scheduled_response_buffer.list; const response = Body.Value{ .InternalBlob = .{ - .bytes = response_buffer.toManaged(bun.default_allocator), + .bytes = scheduled_response_buffer.toManaged(bun.default_allocator), + }, + }; + this.scheduled_response_buffer = .{ + .allocator = default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, }, }; @@ -811,6 +1001,7 @@ pub const Fetch = struct { fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response { const http_response = this.result.response; + this.is_waiting_body = this.result.has_more; return Response{ .allocator = allocator, .url = bun.String.createAtomIfPossible(this.result.href), @@ -830,7 +1021,10 @@ pub const Fetch = struct { const allocator = bun.default_allocator; var response = allocator.create(Response) catch unreachable; response.* = this.toResponse(allocator); - return Response.makeMaybePooled(@as(js.JSContextRef, @ptrCast(this.global_this)), response); + const response_js = Response.makeMaybePooled(@as(js.JSContextRef, this.global_this), response); + response_js.ensureStillAlive(); + this.response = JSC.Strong.create(response_js, this.global_this); + return response_js; } pub fn get( @@ -843,6 +1037,14 @@ pub const Fetch = struct { var fetch_tasklet = try jsc_vm.allocator.create(FetchTasklet); fetch_tasklet.* = .{ + .mutex = Mutex.init(), + .scheduled_response_buffer = .{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }, .response_buffer = MutableString{ .allocator = bun.default_allocator, .list = .{ @@ -905,6 +1107,8 @@ pub const Fetch = struct { fetch_tasklet.http.?.client.disable_timeout = fetch_options.disable_timeout; fetch_tasklet.http.?.client.verbose = fetch_options.verbose; fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive; + // we wanna to return after headers are received + fetch_tasklet.http.?.signalHeaderProgress(); if (fetch_tasklet.request_body == .Sendfile) { std.debug.assert(fetch_options.url.isHTTP()); @@ -973,8 +1177,26 @@ pub const Fetch = struct { } pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void { - task.response_buffer = result.body.?.*; + task.mutex.lock(); + defer task.mutex.unlock(); task.result = result; + task.body_size = result.body_size; + + const success = result.isSuccess(); + task.response_buffer = result.body.?.*; + + if (success) { + _ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch @panic("OOM"); + } + + // reset for reuse + task.response_buffer.reset(); + + if (task.has_schedule_callback.compareAndSwap(false, true, .Acquire, .Monotonic)) |has_schedule_callback| { + if (has_schedule_callback) { + return; + } + } task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit)); } }; diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index be6942392..955d10ffb 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -525,9 +525,16 @@ pub const StreamResult = union(Tag) { into_array: IntoArray, into_array_and_done: IntoArray, pending: *Pending, - err: Syscall.Error, + + err: union(Err) { Error: Syscall.Error, JSValue: JSC.JSValue }, + done: void, + pub const Err = enum { + Error, + JSValue, + }; + pub const Tag = enum { owned, owned_and_done, @@ -757,7 +764,14 @@ pub const StreamResult = union(Tag) { promise.asValue(globalThis).unprotect(); switch (result) { .err => |err| { - promise.reject(globalThis, err.toJSC(globalThis)); + if (err == .Error) { + promise.reject(globalThis, err.Error.toJSC(globalThis)); + } else { + const js_err = err.JSValue; + js_err.ensureStillAlive(); + js_err.unprotect(); + promise.reject(globalThis, js_err); + } }, .done => { promise.resolve(globalThis, JSValue.jsBoolean(false)); @@ -803,7 +817,13 @@ pub const StreamResult = union(Tag) { }, .err => |err| { - return JSC.JSPromise.rejectedPromise(globalThis, JSValue.c(err.toJS(globalThis))).asValue(globalThis); + if (err == .Error) { + return JSC.JSPromise.rejectedPromise(globalThis, JSValue.c(err.Error.toJS(globalThis))).asValue(globalThis); + } + const js_err = err.JSValue; + js_err.ensureStillAlive(); + js_err.unprotect(); + return JSC.JSPromise.rejectedPromise(globalThis, js_err).asValue(globalThis); }, // false == controller.close() @@ -2380,6 +2400,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { return true; } else { this.has_backpressure = !this.res.write(buf); + if (this.has_backpressure) { + this.res.onWritable(*@This(), onWritable, this); + } return true; } @@ -2986,7 +3009,14 @@ pub fn ReadableStreamSource( pub fn processResult(globalThis: *JSGlobalObject, flags: JSValue, result: StreamResult) JSC.JSValue { switch (result) { .err => |err| { - globalThis.vm().throwError(globalThis, err.toJSC(globalThis)); + if (err == .Error) { + globalThis.vm().throwError(globalThis, err.Error.toJSC(globalThis)); + } else { + const js_err = err.JSValue; + js_err.ensureStillAlive(); + js_err.unprotect(); + globalThis.vm().throwError(globalThis, js_err); + } return JSValue.jsUndefined(); }, .temporary_and_done, .owned_and_done, .into_array_and_done => { @@ -3301,12 +3331,29 @@ pub const ByteStream = struct { if (is_really_done) { this.done = true; - this.pending.result = .{ - .into_array_and_done = .{ - .value = this.value(), - .len = @as(Blob.SizeType, @truncate(to_copy.len)), - }, - }; + + if (to_copy.len == 0) { + if (stream == .err) { + if (stream.err == .Error) { + this.pending.result = .{ .err = .{ .Error = stream.err.Error } }; + } + const js_err = stream.err.JSValue; + js_err.ensureStillAlive(); + js_err.protect(); + this.pending.result = .{ .err = .{ .JSValue = js_err } }; + } else { + this.pending.result = .{ + .done = {}, + }; + } + } else { + this.pending.result = .{ + .into_array_and_done = .{ + .value = this.value(), + .len = @as(Blob.SizeType, @truncate(to_copy.len)), + }, + }; + } } else { this.pending.result = .{ .into_array = .{ @@ -3488,7 +3535,7 @@ pub const ReadResult = union(enum) { pub fn toStreamWithIsDone(this: ReadResult, pending: *StreamResult.Pending, buf: []u8, view: JSValue, close_on_empty: bool, is_done: bool) StreamResult { return switch (this) { .pending => .{ .pending = pending }, - .err => .{ .err = this.err }, + .err => .{ .err = .{ .Error = this.err } }, .done => .{ .done = {} }, .read => |slice| brk: { const owned = slice.ptr != buf.ptr; @@ -4064,17 +4111,21 @@ pub const File = struct { this.concurrent.read = @as(Blob.SizeType, @truncate(result catch |err| { if (@hasField(HTTPClient.NetworkThread.Completion, "result")) { this.pending.result = .{ - .err = Syscall.Error{ - .errno = @as(Syscall.Error.Int, @intCast(-completion.result)), - .syscall = .read, + .err = .{ + .Error = Syscall.Error{ + .errno = @as(Syscall.Error.Int, @intCast(-completion.result)), + .syscall = .read, + }, }, }; } else { this.pending.result = .{ - .err = Syscall.Error{ - // this is too hacky - .errno = @as(Syscall.Error.Int, @truncate(@as(u16, @intCast(@max(1, @intFromError(err)))))), - .syscall = .read, + .err = .{ + .Error = Syscall.Error{ + // this is too hacky + .errno = @as(Syscall.Error.Int, @truncate(@as(u16, @intCast(@max(1, @intFromError(err)))))), + .syscall = .read, + }, }, }; } @@ -4101,7 +4152,7 @@ pub const File = struct { else => {}, } - this.pending.result = .{ .err = err }; + this.pending.result = .{ .err = .{ .Error = err } }; scheduleMainThreadTask(this); return; }, diff --git a/src/cli/create_command.zig b/src/cli/create_command.zig index 544329a98..284de1fd5 100644 --- a/src/cli/create_command.zig +++ b/src/cli/create_command.zig @@ -1852,7 +1852,19 @@ pub const Example = struct { // ensure very stable memory address var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable; - async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, api_url, header_entries, headers_buf, mutable, "", 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow); + async_http.* = HTTP.AsyncHTTP.initSync( + ctx.allocator, + .GET, + api_url, + header_entries, + headers_buf, + mutable, + "", + 60 * std.time.ns_per_min, + http_proxy, + null, + HTTP.FetchRedirect.follow, + ); async_http.client.progress_node = progress; const response = try async_http.sendSync(true); @@ -1916,7 +1928,19 @@ pub const Example = struct { // ensure very stable memory address var async_http: *HTTP.AsyncHTTP = ctx.allocator.create(HTTP.AsyncHTTP) catch unreachable; - async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, url, .{}, "", mutable, "", 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow); + async_http.* = HTTP.AsyncHTTP.initSync( + ctx.allocator, + .GET, + url, + .{}, + "", + mutable, + "", + 60 * std.time.ns_per_min, + http_proxy, + null, + HTTP.FetchRedirect.follow, + ); async_http.client.progress_node = progress; var response = try async_http.sendSync(true); @@ -1992,7 +2016,19 @@ pub const Example = struct { http_proxy = env_loader.getHttpProxy(parsed_tarball_url); - async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, parsed_tarball_url, .{}, "", mutable, "", 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow); + async_http.* = HTTP.AsyncHTTP.initSync( + ctx.allocator, + .GET, + parsed_tarball_url, + .{}, + "", + mutable, + "", + 60 * std.time.ns_per_min, + http_proxy, + null, + HTTP.FetchRedirect.follow, + ); async_http.client.progress_node = progress; refresher.maybeRefresh(); @@ -2022,7 +2058,19 @@ pub const Example = struct { var mutable = try ctx.allocator.create(MutableString); mutable.* = try MutableString.init(ctx.allocator, 2048); - async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, url, .{}, "", mutable, "", 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow); + async_http.* = HTTP.AsyncHTTP.initSync( + ctx.allocator, + .GET, + url, + .{}, + "", + mutable, + "", + 60 * std.time.ns_per_min, + http_proxy, + null, + HTTP.FetchRedirect.follow, + ); if (Output.enable_ansi_colors) { async_http.client.progress_node = progress_node; diff --git a/src/cli/upgrade_command.zig b/src/cli/upgrade_command.zig index 3fadfe5c2..79a7777f3 100644 --- a/src/cli/upgrade_command.zig +++ b/src/cli/upgrade_command.zig @@ -223,7 +223,19 @@ pub const UpgradeCommand = struct { // ensure very stable memory address var async_http: *HTTP.AsyncHTTP = allocator.create(HTTP.AsyncHTTP) catch unreachable; - async_http.* = HTTP.AsyncHTTP.initSync(allocator, .GET, api_url, header_entries, headers_buf, &metadata_body, "", 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow); + async_http.* = HTTP.AsyncHTTP.initSync( + allocator, + .GET, + api_url, + header_entries, + headers_buf, + &metadata_body, + "", + 60 * std.time.ns_per_min, + http_proxy, + null, + HTTP.FetchRedirect.follow, + ); if (!silent) async_http.client.progress_node = progress; const response = try async_http.sendSync(true); @@ -454,7 +466,19 @@ pub const UpgradeCommand = struct { var zip_file_buffer = try ctx.allocator.create(MutableString); zip_file_buffer.* = try MutableString.init(ctx.allocator, @max(version.size, 1024)); - async_http.* = HTTP.AsyncHTTP.initSync(ctx.allocator, .GET, zip_url, .{}, "", zip_file_buffer, "", timeout, http_proxy, null, HTTP.FetchRedirect.follow); + async_http.* = HTTP.AsyncHTTP.initSync( + ctx.allocator, + .GET, + zip_url, + .{}, + "", + zip_file_buffer, + "", + timeout, + http_proxy, + null, + HTTP.FetchRedirect.follow, + ); async_http.client.timeout = timeout; async_http.client.progress_node = progress; const response = try async_http.sendSync(true); diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 5714eb09f..2610b0720 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -200,6 +200,7 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type { this.socket, ).?; } + pub fn flush(this: ThisSocket) void { return us_socket_flush( comptime ssl_int, @@ -687,6 +688,13 @@ pub const SocketContext = opaque { us_socket_context_free(@as(i32, 0), this); } + fn getLoop(this: *SocketContext, ssl: bool) ?*Loop { + if (ssl) { + return us_socket_context_loop(@as(i32, 1), this); + } + return us_socket_context_loop(@as(i32, 0), this); + } + /// closes and deinit the SocketContexts pub fn deinit(this: *SocketContext, ssl: bool) void { this.close(ssl); @@ -1000,6 +1008,14 @@ pub const Poll = opaque { us_poll_stop(self, loop); } + pub fn change(self: *Poll, loop: *Loop, events: i32) void { + us_poll_change(self, loop, events); + } + + pub fn getEvents(self: *Poll) i32 { + return us_poll_events(self); + } + pub fn data(self: *Poll, comptime Data: type) *Data { return us_poll_ext(self).?; } diff --git a/src/http/zlib.zig b/src/http/zlib.zig index e827dc1b3..8144930c2 100644 --- a/src/http/zlib.zig +++ b/src/http/zlib.zig @@ -11,7 +11,6 @@ fn initMutableString(allocator: std.mem.Allocator) anyerror!MutableString { } const BufferPool = bun.ObjectPool(MutableString, initMutableString, false, 4); - pub fn get(allocator: std.mem.Allocator) *MutableString { return &BufferPool.get(allocator).data; } diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 4998cec85..725e960d6 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -30,6 +30,7 @@ const ObjectPool = @import("./pool.zig").ObjectPool; const SOCK = os.SOCK; const Arena = @import("./mimalloc_arena.zig").Arena; const ZlibPool = @import("./http/zlib.zig"); + const URLBufferPool = ObjectPool([4096]u8, null, false, 10); const uws = bun.uws; pub const MimeType = @import("./http/mime_type.zig"); @@ -723,6 +724,11 @@ pub const HTTPThread = struct { this.loop.wakeup(); } + pub fn wakeup(this: *@This()) void { + if (this.has_awoken.load(.Monotonic)) + this.loop.wakeup(); + } + pub fn schedule(this: *@This(), batch: Batch) void { if (batch.len == 0) return; @@ -808,11 +814,12 @@ pub fn onClose( // if the peer closed after a full chunk, treat this // as if the transfer had complete, browsers appear to ignore // a missing 0\r\n chunk - if (in_progress and client.state.transfer_encoding == .chunked) { + if (in_progress and client.state.isChunkedEncoding()) { if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) { var buf = client.state.getBodyBuffer(); if (buf.list.items.len > 0) { - client.done(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket); + client.state.received_last_chunk = true; + client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket); return; } } @@ -988,14 +995,17 @@ pub const InternalState = struct { response_message_buffer: MutableString = undefined, pending_response: picohttp.Response = undefined, allow_keepalive: bool = true, + received_last_chunk: bool = false, transfer_encoding: Encoding = Encoding.identity, encoding: Encoding = Encoding.identity, content_encoding_i: u8 = std.math.maxInt(u8), chunked_decoder: picohttp.phr_chunked_decoder = .{}, + zlib_reader: ?*Zlib.ZlibReaderArrayList = null, stage: Stage = Stage.pending, body_out_str: ?*MutableString = null, compressed_body: MutableString = undefined, - body_size: usize = 0, + content_length: ?usize = null, + total_body_received: usize = 0, request_body: []const u8 = "", original_request_body: HTTPRequestBody = .{ .bytes = "" }, request_sent_len: usize = 0, @@ -1015,12 +1025,20 @@ pub const InternalState = struct { }; } + pub fn isChunkedEncoding(this: *InternalState) bool { + return this.transfer_encoding == Encoding.chunked; + } + pub fn reset(this: *InternalState) void { this.compressed_body.deinit(); this.response_message_buffer.deinit(); var body_msg = this.body_out_str; if (body_msg) |body| body.reset(); + if (this.zlib_reader) |reader| { + this.zlib_reader = null; + reader.deinit(); + } this.* = .{ .body_out_str = body_msg, @@ -1042,27 +1060,78 @@ pub const InternalState = struct { } } - fn decompress(this: *InternalState, buffer: MutableString, body_out_str: *MutableString) !void { - defer this.compressed_body.deinit(); + fn isDone(this: *InternalState) bool { + if (this.isChunkedEncoding()) { + return this.received_last_chunk; + } + + if (this.content_length) |content_length| { + return this.total_body_received >= content_length; + } + // TODO: in future to handle Content-Type: text/event-stream we should be done only when Close/End/Timeout connection + return true; + } + + fn decompressConst(this: *InternalState, buffer: []const u8, body_out_str: *MutableString) !void { + defer this.compressed_body.reset(); var gzip_timer: std.time.Timer = undefined; if (extremely_verbose) gzip_timer = std.time.Timer.start() catch @panic("Timer failure"); - body_out_str.list.expandToCapacity(); + var reader: *Zlib.ZlibReaderArrayList = undefined; + if (this.zlib_reader) |current_reader| { + reader = current_reader; + reader.zlib.next_in = buffer.ptr; + reader.zlib.avail_in = @as(u32, @truncate(buffer.len)); + + reader.list = body_out_str.list; + const initial = body_out_str.list.items.len; + body_out_str.list.expandToCapacity(); + if (body_out_str.list.capacity == initial) { + try body_out_str.list.ensureUnusedCapacity(body_out_str.allocator, 4096); + body_out_str.list.expandToCapacity(); + } + reader.zlib.next_out = &body_out_str.list.items[initial]; + reader.zlib.avail_out = @as(u32, @truncate(body_out_str.list.capacity - initial)); + // we reset the total out so we can track how much we decompressed this time + reader.zlib.total_out = initial; + } else { + reader = try Zlib.ZlibReaderArrayList.initWithOptionsAndListAllocator( + buffer, + &body_out_str.list, + body_out_str.allocator, + default_allocator, + .{ + // TODO: add br support today we support gzip and deflate only + // zlib.MAX_WBITS = 15 + // to (de-)compress deflate format, use wbits = -zlib.MAX_WBITS + // to (de-)compress zlib format, use wbits = zlib.MAX_WBITS + // to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16 + .windowBits = if (this.encoding == Encoding.gzip) Zlib.MAX_WBITS | 16 else -Zlib.MAX_WBITS, + }, + ); + this.zlib_reader = reader; + } - ZlibPool.decompress(buffer.list.items, body_out_str, default_allocator) catch |err| { - Output.prettyErrorln("<r><red>Zlib error: {s}<r>", .{bun.asByteSlice(@errorName(err))}); - Output.flush(); - return err; + reader.readAll() catch |err| { + if (this.isDone() or error.ShortRead != err) { + Output.prettyErrorln("<r><red>Zlib error: {s}<r>", .{bun.asByteSlice(@errorName(err))}); + Output.flush(); + return err; + } }; if (extremely_verbose) this.gzip_elapsed = gzip_timer.read(); } - pub fn processBodyBuffer(this: *InternalState, buffer: MutableString) !void { + fn decompress(this: *InternalState, buffer: MutableString, body_out_str: *MutableString) !void { + try this.decompressConst(buffer.list.items, body_out_str); + } + + pub fn processBodyBuffer(this: *InternalState, buffer: MutableString) !usize { var body_out_str = this.body_out_str.?; switch (this.encoding) { @@ -1080,10 +1149,10 @@ pub const InternalState = struct { }, } - this.postProcessBody(body_out_str); + return this.postProcessBody(); } - pub fn postProcessBody(this: *InternalState, body_out_str: *MutableString) void { + pub fn postProcessBody(this: *InternalState) usize { var response = &this.pending_response; // if it compressed with this header, it is no longer if (this.content_encoding_i < response.headers.len) { @@ -1093,7 +1162,7 @@ pub const InternalState = struct { this.content_encoding_i = std.math.maxInt(@TypeOf(this.content_encoding_i)); } - this.body_size = @as(usize, @truncate(body_out_str.list.items.len)); + return this.body_out_str.?.list.items.len; } }; @@ -1119,7 +1188,7 @@ disable_keepalive: bool = false, state: InternalState = .{}, -completion_callback: HTTPClientResult.Callback = undefined, +result_callback: HTTPClientResult.Callback = undefined, /// Some HTTP servers (such as npm) report Last-Modified times but ignore If-Modified-Since. /// This is a workaround for that. @@ -1135,7 +1204,20 @@ proxy_tunnel: ?ProxyTunnel = null, aborted: ?*std.atomic.Atomic(bool) = null, async_http_id: u32 = 0, hostname: ?[]u8 = null, -pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, header_entries: Headers.Entries, header_buf: string, signal: ?*std.atomic.Atomic(bool), hostname: ?[]u8) HTTPClient { +signal_header_progress: *std.atomic.Atomic(bool), +enable_body_stream: *std.atomic.Atomic(bool), + +pub fn init( + allocator: std.mem.Allocator, + method: Method, + url: URL, + header_entries: Headers.Entries, + header_buf: string, + signal: ?*std.atomic.Atomic(bool), + hostname: ?[]u8, + signal_header_progress: *std.atomic.Atomic(bool), + enable_body_stream: *std.atomic.Atomic(bool), +) HTTPClient { return HTTPClient{ .allocator = allocator, .method = method, @@ -1144,6 +1226,8 @@ pub fn init(allocator: std.mem.Allocator, method: Method, url: URL, header_entri .header_buf = header_buf, .aborted = signal, .hostname = hostname, + .signal_header_progress = signal_header_progress, + .enable_body_stream = enable_body_stream, }; } @@ -1160,9 +1244,6 @@ pub fn deinit(this: *HTTPClient) void { tunnel.deinit(); this.proxy_tunnel = null; } - - this.state.compressed_body.deinit(); - this.state.response_message_buffer.deinit(); } const Stage = enum(u8) { @@ -1286,7 +1367,7 @@ pub const AsyncHTTP = struct { next: ?*AsyncHTTP = null, task: ThreadPool.Task = ThreadPool.Task{ .callback = &startAsyncHTTP }, - completion_callback: HTTPClientResult.Callback = undefined, + result_callback: HTTPClientResult.Callback = undefined, /// Timeout in nanoseconds timeout: usize = 0, @@ -1303,6 +1384,9 @@ pub const AsyncHTTP = struct { elapsed: u64 = 0, gzip_elapsed: u64 = 0, + signal_header_progress: std.atomic.Atomic(bool), + enable_body_stream: std.atomic.Atomic(bool), + pub var active_requests_count = std.atomic.Atomic(usize).init(0); pub var max_simultaneous_requests = std.atomic.Atomic(usize).init(256); @@ -1332,6 +1416,16 @@ pub const AsyncHTTP = struct { } } + pub fn signalHeaderProgress(this: *AsyncHTTP) void { + @fence(.Release); + this.client.signal_header_progress.store(true, .Release); + } + + pub fn enableBodyStreaming(this: *AsyncHTTP) void { + @fence(.Release); + this.client.enable_body_stream.store(true, .Release); + } + pub fn clearData(this: *AsyncHTTP) void { this.response_headers.deinit(this.allocator); this.response_headers = .{}; @@ -1371,12 +1465,14 @@ pub const AsyncHTTP = struct { .request_header_buf = headers_buf, .request_body = .{ .bytes = request_body }, .response_buffer = response_buffer, - .completion_callback = callback, + .result_callback = callback, .http_proxy = http_proxy, .async_http_id = if (signal != null) async_http_id.fetchAdd(1, .Monotonic) else 0, + .signal_header_progress = std.atomic.Atomic(bool).init(false), + .enable_body_stream = std.atomic.Atomic(bool).init(false), }; - this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname); + this.client = HTTPClient.init(allocator, method, url, headers, headers_buf, signal, hostname, &this.signal_header_progress, &this.enable_body_stream); this.client.async_http_id = this.async_http_id; this.client.timeout = timeout; this.client.http_proxy = this.http_proxy; @@ -1537,7 +1633,7 @@ pub const AsyncHTTP = struct { var ctx = try bun.default_allocator.create(SingleHTTPChannel); ctx.* = SingleHTTPChannel.init(); - this.completion_callback = HTTPClientResult.Callback.New( + this.result_callback = HTTPClientResult.Callback.New( *SingleHTTPChannel, sendSyncCallback, ).init(ctx); @@ -1557,12 +1653,10 @@ pub const AsyncHTTP = struct { unreachable; } - pub fn onAsyncHTTPComplete(this: *AsyncHTTP, result: HTTPClientResult) void { + pub fn onAsyncHTTPCallback(this: *AsyncHTTP, result: HTTPClientResult) void { std.debug.assert(this.real != null); - const active_requests = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic); - std.debug.assert(active_requests > 0); - var completion = this.completion_callback; + var callback = this.result_callback; this.elapsed = http_thread.timer.read() -| this.elapsed; this.redirected = this.client.remaining_redirect_count != default_redirect_count; if (!result.isSuccess()) { @@ -1574,19 +1668,27 @@ pub const AsyncHTTP = struct { this.response = result.response; this.state.store(.success, .Monotonic); } - this.client.deinit(); - this.real.?.* = this.*; - this.real.?.response_buffer = this.response_buffer; + if (result.has_more) { + callback.function(callback.ctx, result); + } else { + this.client.deinit(); + + this.real.?.* = this.*; + this.real.?.response_buffer = this.response_buffer; - log("onAsyncHTTPComplete: {any}", .{bun.fmt.fmtDuration(this.elapsed)}); + log("onAsyncHTTPCallback: {any}", .{bun.fmt.fmtDuration(this.elapsed)}); - default_allocator.destroy(this); + default_allocator.destroy(this); - completion.function(completion.ctx, result); + const active_requests = AsyncHTTP.active_requests_count.fetchSub(1, .Monotonic); + std.debug.assert(active_requests > 0); - if (active_requests >= AsyncHTTP.max_simultaneous_requests.load(.Monotonic)) { - http_thread.drainEvents(); + callback.function(callback.ctx, result); + + if (active_requests >= AsyncHTTP.max_simultaneous_requests.load(.Monotonic)) { + http_thread.drainEvents(); + } } } @@ -1599,7 +1701,7 @@ pub const AsyncHTTP = struct { _ = active_requests_count.fetchAdd(1, .Monotonic); this.err = null; this.state.store(.sending, .Monotonic); - this.client.completion_callback = HTTPClientResult.Callback.New(*AsyncHTTP, onAsyncHTTPComplete).init( + this.client.result_callback = HTTPClientResult.Callback.New(*AsyncHTTP, onAsyncHTTPCallback).init( this, ); @@ -2153,6 +2255,7 @@ fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTP } pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u8, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void { + log("onData {}", .{incoming_data.len}); if (this.hasSignalAborted()) { this.closeAndAbort(is_ssl, socket); return; @@ -2241,7 +2344,11 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u this.cloneMetadata(); if (!can_continue) { - this.done(is_ssl, ctx, socket); + // if is chuncked but no body is expected we mark the last chunk + this.state.received_last_chunk = true; + // if is not we ignore the content_length + this.state.content_length = 0; + this.progressUpdate(is_ssl, ctx, socket); return; } @@ -2251,35 +2358,45 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u } if (body_buf.len == 0) { + // no body data yet, but we can report the headers + if (this.signal_header_progress.load(.Acquire)) { + this.progressUpdate(is_ssl, ctx, socket); + } return; } if (this.state.response_stage == .body) { { - const is_done = this.handleResponseBody(body_buf, true) catch |err| { + const report_progress = this.handleResponseBody(body_buf, true) catch |err| { this.closeAndFail(err, is_ssl, socket); return; }; - if (is_done) { - this.done(is_ssl, ctx, socket); + if (report_progress) { + this.progressUpdate(is_ssl, ctx, socket); return; } } } else if (this.state.response_stage == .body_chunk) { this.setTimeout(socket, 500); { - const is_done = this.handleResponseBodyChunkedEncoding(body_buf) catch |err| { + const report_progress = this.handleResponseBodyChunkedEncoding(body_buf) catch |err| { this.closeAndFail(err, is_ssl, socket); return; }; - if (is_done) { - this.done(is_ssl, ctx, socket); + if (report_progress) { + this.progressUpdate(is_ssl, ctx, socket); return; } } } + + // if not reported we report partially now + if (this.signal_header_progress.load(.Acquire)) { + this.progressUpdate(is_ssl, ctx, socket); + return; + } }, .body => { @@ -2295,23 +2412,23 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u defer data.deinit(); const decoded_data = data.slice(); if (decoded_data.len == 0) return; - const is_done = this.handleResponseBody(decoded_data, false) catch |err| { + const report_progress = this.handleResponseBody(decoded_data, false) catch |err| { this.closeAndFail(err, is_ssl, socket); return; }; - if (is_done) { - this.done(is_ssl, ctx, socket); + if (report_progress) { + this.progressUpdate(is_ssl, ctx, socket); return; } } else { - const is_done = this.handleResponseBody(incoming_data, false) catch |err| { + const report_progress = this.handleResponseBody(incoming_data, false) catch |err| { this.closeAndFail(err, is_ssl, socket); return; }; - if (is_done) { - this.done(is_ssl, ctx, socket); + if (report_progress) { + this.progressUpdate(is_ssl, ctx, socket); return; } } @@ -2331,23 +2448,23 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u const decoded_data = data.slice(); if (decoded_data.len == 0) return; - const is_done = this.handleResponseBodyChunkedEncoding(decoded_data) catch |err| { + const report_progress = this.handleResponseBodyChunkedEncoding(decoded_data) catch |err| { this.closeAndFail(err, is_ssl, socket); return; }; - if (is_done) { - this.done(is_ssl, ctx, socket); + if (report_progress) { + this.progressUpdate(is_ssl, ctx, socket); return; } } else { - const is_done = this.handleResponseBodyChunkedEncoding(incoming_data) catch |err| { + const report_progress = this.handleResponseBodyChunkedEncoding(incoming_data) catch |err| { this.closeAndFail(err, is_ssl, socket); return; }; - if (is_done) { - this.done(is_ssl, ctx, socket); + if (report_progress) { + this.progressUpdate(is_ssl, ctx, socket); return; } } @@ -2403,7 +2520,7 @@ fn fail(this: *HTTPClient, err: anyerror) void { this.state.fail = err; this.state.stage = .fail; - const callback = this.completion_callback; + const callback = this.result_callback; const result = this.toResult(this.cloned_metadata); this.state.reset(); this.proxy_tunneling = false; @@ -2440,39 +2557,50 @@ pub fn setTimeout(this: *HTTPClient, socket: anytype, amount: c_uint) void { socket.timeout(amount); } -pub fn done(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void { +pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPContext(is_ssl), socket: NewHTTPContext(is_ssl).HTTPSocket) void { if (this.state.stage != .done and this.state.stage != .fail) { - if (this.aborted != null) { + const is_done = this.state.isDone(); + + if (this.aborted != null and is_done) { _ = socket_async_http_abort_tracker.swapRemove(this.async_http_id); } - log("done", .{}); + log("progressUpdate {}", .{is_done}); var out_str = this.state.body_out_str.?; var body = out_str.*; this.cloned_metadata.response = this.state.pending_response; const result = this.toResult(this.cloned_metadata); - const callback = this.completion_callback; + const callback = this.result_callback; - socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr()); + if (is_done) { + socket.ext(**anyopaque).?.* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr()); - if (this.state.allow_keepalive and !this.disable_keepalive and !socket.isClosed() and FeatureFlags.enable_keepalive) { - ctx.releaseSocket( - socket, - this.connected_url.hostname, - this.connected_url.getPortAuto(), - ); - } else if (!socket.isClosed()) { - socket.close(0, null); + if (this.state.allow_keepalive and !this.disable_keepalive and !socket.isClosed() and FeatureFlags.enable_keepalive) { + ctx.releaseSocket( + socket, + this.connected_url.hostname, + this.connected_url.getPortAuto(), + ); + } else if (!socket.isClosed()) { + socket.close(0, null); + } + this.state.reset(); + this.state.response_stage = .done; + this.state.request_stage = .done; + this.state.stage = .done; + this.proxy_tunneling = false; + if (comptime print_every > 0) { + print_every_i += 1; + if (print_every_i % print_every == 0) { + Output.prettyln("Heap stats for HTTP thread\n", .{}); + Output.flush(); + default_arena.dumpThreadStats(); + print_every_i = 0; + } + } } - - this.state.reset(); result.body.?.* = body; - std.debug.assert(this.state.stage != .done); - this.state.response_stage = .done; - this.state.request_stage = .done; - this.state.stage = .done; - this.proxy_tunneling = false; if (comptime print_every > 0) { print_every_i += 1; if (print_every_i % print_every == 0) { @@ -2494,6 +2622,19 @@ pub const HTTPClientResult = struct { fail: anyerror = error.NoError, redirected: bool = false, headers_buf: []picohttp.Header = &.{}, + has_more: bool = false, + + /// For Http Client requests + /// when Content-Length is provided this represents the whole size of the request + /// If chunked encoded this will represent the total received size (ignoring the chunk headers) + /// If is not chunked encoded and Content-Length is not provided this will be unknown + body_size: BodySize = .unknown, + + pub const BodySize = union(enum) { + total_received: usize, + content_length: usize, + unknown: void, + }; pub fn isSuccess(this: *const HTTPClientResult) bool { return this.fail == error.NoError; @@ -2547,6 +2688,12 @@ pub const HTTPClientResult = struct { }; pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientResult { + const body_size: HTTPClientResult.BodySize = if (this.state.isChunkedEncoding()) + .{ .total_received = this.state.total_body_received } + else if (this.state.content_length) |content_length| + .{ .content_length = content_length } + else + .{ .unknown = {} }; return HTTPClientResult{ .body = this.state.body_out_str, .response = metadata.response, @@ -2555,6 +2702,8 @@ pub fn toResult(this: *HTTPClient, metadata: HTTPResponseMetadata) HTTPClientRes .href = metadata.url, .fail = this.state.fail, .headers_buf = metadata.response.headers, + .has_more = this.state.fail == error.NoError and !this.state.isDone(), + .body_size = body_size, }; } @@ -2566,10 +2715,10 @@ const preallocate_max = 1024 * 1024 * 256; pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool { std.debug.assert(this.state.transfer_encoding == .identity); - + const content_length = this.state.content_length orelse 0; // is it exactly as much as we need? - if (is_only_buffer and incoming_data.len >= this.state.body_size) { - try handleResponseBodyFromSinglePacket(this, incoming_data[0..this.state.body_size]); + if (is_only_buffer and incoming_data.len >= content_length) { + try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length]); return true; } else { return handleResponseBodyFromMultiplePackets(this, incoming_data); @@ -2577,6 +2726,10 @@ pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_ } fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const u8) !void { + if (!this.state.isChunkedEncoding()) { + this.state.total_body_received += incoming_data.len; + } + if (this.state.encoding.isCompressed()) { var body_buffer = this.state.body_out_str.?; if (body_buffer.list.capacity == 0) { @@ -2584,7 +2737,7 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const try body_buffer.growBy(@max(@as(usize, @intFromFloat(min)), 32)); } - try ZlibPool.decompress(incoming_data, body_buffer, default_allocator); + try this.state.decompressConst(incoming_data, body_buffer); } else { try this.state.getBodyBuffer().appendSliceExact(incoming_data); } @@ -2605,42 +2758,44 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const progress.context.maybeRefresh(); } - this.state.postProcessBody(this.state.getBodyBuffer()); + _ = this.state.postProcessBody(); } fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool { var buffer = this.state.getBodyBuffer(); + const content_length = this.state.content_length orelse 0; if (buffer.list.items.len == 0 and - this.state.body_size > 0 and this.state.body_size < preallocate_max) + content_length > 0 and incoming_data.len < preallocate_max) { - // since we don't do streaming yet, we might as well just allocate the whole thing - // when we know the expected size - buffer.list.ensureTotalCapacityPrecise(buffer.allocator, this.state.body_size) catch {}; + buffer.list.ensureTotalCapacityPrecise(buffer.allocator, incoming_data.len) catch {}; } - const remaining_content_length = this.state.body_size -| buffer.list.items.len; + const remaining_content_length = content_length -| this.state.total_body_received; var remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)]; _ = try buffer.write(remainder); + this.state.total_body_received += remainder.len; + if (this.progress_node) |progress| { progress.activate(); - progress.setCompletedItems(buffer.list.items.len); + progress.setCompletedItems(this.state.total_body_received); progress.context.maybeRefresh(); } - if (buffer.list.items.len == this.state.body_size) { - try this.state.processBodyBuffer(buffer.*); + // done or streaming + const is_done = this.state.total_body_received >= content_length; + if (is_done or this.enable_body_stream.load(.Acquire)) { + const processed = try this.state.processBodyBuffer(buffer.*); if (this.progress_node) |progress| { progress.activate(); - progress.setCompletedItems(buffer.list.items.len); + progress.setCompletedItems(this.state.total_body_received); progress.context.maybeRefresh(); } - return true; + return is_done or processed > 0; } - return false; } @@ -2676,6 +2831,8 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets( &bytes_decoded, ); buffer.list.items.len -|= incoming_data.len - bytes_decoded; + this.state.total_body_received += bytes_decoded; + buffer_.* = buffer; switch (pret) { @@ -2690,12 +2847,18 @@ fn handleResponseBodyChunkedEncodingFromMultiplePackets( progress.setCompletedItems(buffer.list.items.len); progress.context.maybeRefresh(); } + // streaming chunks + if (this.enable_body_stream.load(.Acquire)) { + const processed = try this.state.processBodyBuffer(buffer); + return processed > 0; + } return false; }, // Done else => { - try this.state.processBodyBuffer( + this.state.received_last_chunk = true; + _ = try this.state.processBodyBuffer( buffer, ); @@ -2746,6 +2909,7 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket( &bytes_decoded, ); buffer.len -|= incoming_data.len - bytes_decoded; + this.state.total_body_received += bytes_decoded; switch (pret) { // Invalid HTTP response body @@ -2759,12 +2923,21 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket( progress.setCompletedItems(buffer.len); progress.context.maybeRefresh(); } - try this.state.getBodyBuffer().appendSliceExact(buffer); + const body_buffer = this.state.getBodyBuffer(); + try body_buffer.appendSliceExact(buffer); + + // streaming chunks + if (this.enable_body_stream.load(.Acquire)) { + const processed = try this.state.processBodyBuffer(body_buffer.*); + return processed > 0; + } return false; }, // Done else => { + this.state.received_last_chunk = true; + try this.handleResponseBodyFromSinglePacket(buffer); std.debug.assert(this.state.body_out_str.?.list.items.ptr != buffer.ptr); if (this.progress_node) |progress| { @@ -2790,8 +2963,13 @@ pub fn handleResponseMetadata( for (response.headers, 0..) |header, header_i| { switch (hashHeaderName(header.name)) { hashHeaderConst("Content-Length") => { - const content_length = std.fmt.parseInt(@TypeOf(this.state.body_size), header.value, 10) catch 0; - this.state.body_size = content_length; + const content_length = std.fmt.parseInt(usize, header.value, 10) catch 0; + if (this.method.hasBody()) { + this.state.content_length = content_length; + } else { + // ignore body size for HEAD requests + this.state.content_length = content_length; + } }, hashHeaderConst("Content-Encoding") => { if (strings.eqlComptime(header.value, "gzip")) { @@ -2968,6 +3146,7 @@ pub fn handleResponseMetadata( // if is no redirect or if is redirect == "manual" just proceed this.state.response_stage = if (this.state.transfer_encoding == .chunked) .body_chunk else .body; + const content_length = this.state.content_length orelse 0; // if no body is expected we should stop processing - return this.method.hasBody() and (this.state.body_size > 0 or this.state.transfer_encoding == .chunked); + return this.method.hasBody() and (content_length > 0 or this.state.transfer_encoding == .chunked); } diff --git a/src/zlib.zig b/src/zlib.zig index d79965a2a..b578f0ede 100644 --- a/src/zlib.zig +++ b/src/zlib.zig @@ -3,6 +3,8 @@ const std = @import("std"); const bun = @import("root").bun; +pub const MAX_WBITS = 15; + test "Zlib Read" { const expected_text = @embedFile("./zlib.test.txt"); const input = bun.asByteSlice(@embedFile("./zlib.test.gz")); @@ -525,7 +527,11 @@ pub const ZlibReaderArrayList = struct { pub fn readAll(this: *ZlibReader) ZlibError!void { defer { - this.list.shrinkRetainingCapacity(this.zlib.total_out); + if (this.list.items.len > this.zlib.total_out) { + this.list.shrinkRetainingCapacity(this.zlib.total_out); + } else if (this.zlib.total_out < this.list.capacity) { + this.list.items.len = this.zlib.total_out; + } this.list_ptr.* = this.list; } diff --git a/test/js/web/fetch/fetch-gzip.test.ts b/test/js/web/fetch/fetch-gzip.test.ts index 32888947b..0569eaad8 100644 --- a/test/js/web/fetch/fetch-gzip.test.ts +++ b/test/js/web/fetch/fetch-gzip.test.ts @@ -86,7 +86,7 @@ it("fetch() with a protocol-relative redirect that returns a buffered gzip respo server.stop(); }); -it("fetch() with a gzip response works (one chunk, streamed, with a delay", async () => { +it("fetch() with a gzip response works (one chunk, streamed, with a delay)", async () => { var server = Bun.serve({ port: 0, @@ -121,7 +121,7 @@ it("fetch() with a gzip response works (one chunk, streamed, with a delay", asyn server.stop(); }); -it("fetch() with a gzip response works (multiple chunks, TCP server", async done => { +it("fetch() with a gzip response works (multiple chunks, TCP server)", async done => { const compressed = await Bun.file(import.meta.dir + "/fixture.html.gz").arrayBuffer(); var socketToClose!: Socket; const server = Bun.listen({ diff --git a/test/js/web/fetch/fetch.stream.test.ts b/test/js/web/fetch/fetch.stream.test.ts new file mode 100644 index 000000000..efef6a161 --- /dev/null +++ b/test/js/web/fetch/fetch.stream.test.ts @@ -0,0 +1,1129 @@ +import { Socket, Server, TCPSocketListener } from "bun"; +import { readFileSync } from "fs"; +import { join } from "path"; +import { describe, expect, it } from "bun:test"; +import { gcTick } from "harness"; + +const fixtures = { + "fixture": readFileSync(join(import.meta.dir, "fixture.html")), + "fixture.png": readFileSync(join(import.meta.dir, "fixture.png")), + "fixture.png.gz": readFileSync(join(import.meta.dir, "fixture.png.gz")), +}; + +const invalid = Buffer.from([0xc0]); + +const bigText = Buffer.from("a".repeat(1 * 1024 * 1024)); +const smallText = Buffer.from("Hello".repeat(16)); +const empty = Buffer.alloc(0); + +describe("fetch() with streaming", () => { + it("stream still works after response get out of scope", async () => { + let server: Server | null = null; + try { + const content = "Hello, world!\n".repeat(5); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = Buffer.from(content, "utf8"); + const size = data.byteLength / 5; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 5)); + await controller.flush(); + + controller.close(); + }, + }), + { status: 200, headers: { "Content-Type": "text/plain" } }, + ); + }, + }); + + async function getReader() { + return (await fetch(`http://${server.hostname}:${server.port}`, {})).body?.getReader(); + } + gcTick(false); + const reader = await getReader(); + gcTick(false); + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + parts++; + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBeGreaterThan(1); + } finally { + server?.stop(); + } + }); + + it("response inspected size should reflect stream state", async () => { + let server: Server | null = null; + try { + const content = "Bun!\n".repeat(4); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = Buffer.from(content, "utf8"); + const size = data.byteLength / 5; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 4)); + await controller.flush(); + + controller.close(); + }, + }), + { status: 200, headers: { "Content-Type": "text/plain" } }, + ); + }, + }); + + function inspectBytes(response: Response) { + const match = /Response \(([0-9]+ )bytes\)/g.exec( + Bun.inspect(response, { + depth: 0, + }), + ); + if (!match) return 0; + return parseInt(match[1]?.trim(), 10); + } + + const res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + gcTick(false); + let size = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + size += value.length; + } + expect(inspectBytes(res)).toBe(size); + if (done) { + break; + } + } + + gcTick(false); + } finally { + server?.stop(); + } + }); + + it("can handle multiple simultaneos requests", async () => { + let server: Server | null = null; + try { + const content = "Hello, world!\n".repeat(5); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = Buffer.from(content, "utf8"); + const size = data.byteLength / 5; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 5)); + await controller.flush(); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + }, + }, + ); + }, + }); + + const server_url = `http://${server.hostname}:${server.port}`; + async function doRequest() { + await Bun.sleep(10); + const res = await fetch(server_url); + const reader = res.body?.getReader(); + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + parts++; + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBeGreaterThan(1); + } + + await Promise.all([doRequest(), doRequest(), doRequest(), doRequest(), doRequest(), doRequest()]); + } finally { + server?.stop(); + } + }); + + it(`can handle transforms`, async () => { + let server: Server | null = null; + try { + const content = "Hello, world!\n".repeat(5); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = Buffer.from(content, "utf8"); + const size = data.byteLength / 5; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 5)); + await controller.flush(); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + }, + }, + ); + }, + }); + + const server_url = `http://${server.hostname}:${server.port}`; + const res = await fetch(server_url); + + const transform = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(Buffer.from(chunk).toString("utf8").toUpperCase()); + }, + }); + + const reader = res.body?.pipeThrough(transform).getReader(); + + let result = ""; + while (true) { + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + result += value; + } + if (done) { + break; + } + } + + gcTick(false); + expect(result).toBe(content.toUpperCase()); + } finally { + server?.stop(); + } + }); + + it(`can handle gz images`, async () => { + let server: Server | null = null; + try { + server = Bun.serve({ + port: 0, + fetch(req) { + const data = fixtures["fixture.png.gz"]; + return new Response(data, { + status: 200, + headers: { + "Content-Type": "text/plain", + "Content-Encoding": "gzip", + }, + }); + }, + }); + + const server_url = `http://${server.hostname}:${server.port}`; + const res = await fetch(server_url); + + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + while (true) { + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer).toEqual(fixtures["fixture.png"]); + } finally { + server?.stop(); + } + }); + + it(`can proxy fetch with Bun.serve`, async () => { + let server: Server | null = null; + let server_original: Server | null = null; + try { + const content = "a".repeat(64 * 1024); + + server_original = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = Buffer.from(content, "utf8"); + const size = data.byteLength / 5; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 5)); + await controller.flush(); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + }, + }, + ); + }, + }); + + server = Bun.serve({ + port: 0, + async fetch(req) { + const response = await fetch(`http://${server_original.hostname}:${server_original.port}`, {}); + await Bun.sleep(10); + return new Response(response.body, { + status: 200, + headers: { + "Content-Type": "text/plain", + }, + }); + }, + }); + + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + parts++; + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBeGreaterThanOrEqual(1); + } finally { + server?.stop(); + server_original?.stop(); + } + }); + const matrix = [ + { name: "small", data: fixtures["fixture"] }, + { name: "small text", data: smallText }, + { name: "big text", data: bigText }, + { name: "img", data: fixtures["fixture.png"] }, + { name: "empty", data: empty }, + ]; + for (let i = 0; i < matrix.length; i++) { + const fixture = matrix[i]; + for (let j = 0; j < matrix.length; j++) { + const fixtureb = matrix[j]; + const test = fixture.name == "empty" && fixtureb.name == "empty" ? it.todo : it; + test(`can handle fixture ${fixture.name} x ${fixtureb.name}`, async () => { + let server: Server | null = null; + try { + //@ts-ignore + const data = fixture.data; + //@ts-ignore + const data_b = fixtureb.data; + const content = Buffer.concat([data, data_b]); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + controller.write(data); + await controller.flush(); + await Bun.sleep(100); + controller.write(data_b); + await controller.flush(); + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + }, + }, + ); + }, + }); + + const server_url = `http://${server.hostname}:${server.port}`; + const res = await fetch(server_url); + const reader = res.body?.getReader(); + let buffer = Buffer.alloc(0); + while (true) { + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + if (done) { + break; + } + } + gcTick(false); + expect(buffer).toEqual(content); + } finally { + server?.stop(); + } + }); + } + } + + type CompressionType = "no" | "gzip" | "deflate" | "br"; + type TestType = { headers: Record<string, string>; compression: CompressionType; skip?: boolean }; + const types: Array<TestType> = [ + { headers: {}, compression: "no" }, + { headers: { "Content-Encoding": "gzip" }, compression: "gzip" }, + { headers: { "Content-Encoding": "deflate" }, compression: "deflate" }, + // { headers: { "Content-Encoding": "br" }, compression: "br", skip: true }, // not implemented yet + ]; + + function compress(compression: CompressionType, data: Uint8Array) { + switch (compression) { + case "gzip": + return Bun.gzipSync(data); + case "deflate": + return Bun.deflateSync(data); + default: + return data; + } + } + + for (const { headers, compression, skip } of types) { + const test = skip ? it.skip : it; + + test(`with invalid utf8 with ${compression} compression`, async () => { + let server: Server | null = null; + try { + const content = Buffer.concat([invalid, Buffer.from("Hello, world!\n".repeat(5), "utf8"), invalid]); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = compress(compression, content); + const size = data.byteLength / 4; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 4)); + await controller.flush(); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }, + ); + }, + }); + + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer).toEqual(content); + } finally { + server?.stop(); + } + }); + + test(`chunked response works (single chunk) with ${compression} compression`, async () => { + let server: Server | null = null; + try { + const content = "Hello, world!\n".repeat(5); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = compress(compression, Buffer.from(content, "utf8")); + controller.write(data); + await controller.flush(); + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }, + ); + }, + }); + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const result = await res.text(); + gcTick(false); + expect(result).toBe(content); + + res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + parts++; + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBe(1); + } finally { + server?.stop(); + } + }); + + test(`chunked response works (multiple chunks) with ${compression} compression`, async () => { + let server: Server | null = null; + try { + const content = "Hello, world!\n".repeat(5); + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response( + new ReadableStream({ + type: "direct", + async pull(controller) { + const data = compress(compression, Buffer.from(content, "utf8")); + const size = data.byteLength / 5; + controller.write(data.slice(0, size)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size, size * 2)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 2, size * 3)); + await controller.flush(); + await Bun.sleep(100); + controller.write(data.slice(size * 3, size * 5)); + await controller.flush(); + + controller.close(); + }, + }), + { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }, + ); + }, + }); + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const result = await res.text(); + gcTick(false); + expect(result).toBe(content); + + res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + parts++; + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBeGreaterThan(1); + } finally { + server?.stop(); + } + }); + + test(`Content-Length response works (single part) with ${compression} compression`, async () => { + let server: Server | null = null; + try { + const content = "a".repeat(1024); + + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response(compress(compression, Buffer.from(content)), { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }); + }, + }); + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const result = await res.text(); + gcTick(false); + expect(result).toBe(content); + + res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + parts++; + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBe(1); + } finally { + server?.stop(); + } + }); + + test(`Content-Length response works (multiple parts) with ${compression} compression`, async () => { + let server: Server | null = null; + try { + const content = "a".repeat(64 * 1024); + + server = Bun.serve({ + port: 0, + fetch(req) { + return new Response(compress(compression, Buffer.from(content)), { + status: 200, + headers: { + "Content-Type": "text/plain", + ...headers, + }, + }); + }, + }); + let res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const result = await res.text(); + gcTick(false); + expect(result).toBe(content); + + res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + let parts = 0; + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + parts++; + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + expect(parts).toBeGreaterThan(1); + } finally { + server?.stop(); + } + }); + + test(`Extra data should be ignored on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => { + let server: TCPSocketListener<any> | null = null; + + try { + const parts = 5; + const content = "Hello".repeat(parts); + + server = Bun.listen({ + port: 0, + hostname: "0.0.0.0", + socket: { + async open(socket) { + var corked: any[] = []; + var cork = true; + async function write(chunk: any) { + await new Promise<void>((resolve, reject) => { + if (cork) { + corked.push(chunk); + } + + if (!cork && corked.length) { + socket.write(corked.join("")); + corked.length = 0; + socket.flush(); + } + + if (!cork) { + socket.write(chunk); + socket.flush(); + } + + resolve(); + }); + } + const compressed = compress(compression, Buffer.from(content, "utf8")); + await write("HTTP/1.1 200 OK\r\n"); + await write("Content-Type: text/plain\r\n"); + for (const [key, value] of Object.entries(headers)) { + await write(key + ": " + value + "\r\n"); + } + await write("Content-Length: " + compressed.byteLength + "\r\n"); + await write("\r\n"); + const size = compressed.byteLength / 5; + for (var i = 0; i < 5; i++) { + cork = false; + await write(compressed.slice(size * i, size * (i + 1))); + } + await write("Extra Data!"); + await write("Extra Data!"); + socket.flush(); + }, + drain(socket) {}, + }, + }); + + const res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe(content); + } finally { + server?.stop(true); + } + }); + + test(`Missing data should timeout on streaming (multiple chunks, TCP server) with ${compression} compression`, async () => { + let server: TCPSocketListener<any> | null = null; + + try { + const parts = 5; + const content = "Hello".repeat(parts); + + server = Bun.listen({ + port: 0, + hostname: "0.0.0.0", + socket: { + async open(socket) { + var corked: any[] = []; + var cork = true; + async function write(chunk: any) { + await new Promise<void>((resolve, reject) => { + if (cork) { + corked.push(chunk); + } + + if (!cork && corked.length) { + socket.write(corked.join("")); + corked.length = 0; + socket.flush(); + } + + if (!cork) { + socket.write(chunk); + socket.flush(); + } + + resolve(); + }); + } + const compressed = compress(compression, Buffer.from(content, "utf8")); + await write("HTTP/1.1 200 OK\r\n"); + await write("Content-Type: text/plain\r\n"); + for (const [key, value] of Object.entries(headers)) { + await write(key + ": " + value + "\r\n"); + } + // 10 extra missing bytes that we will never sent + await write("Content-Length: " + compressed.byteLength + 10 + "\r\n"); + await write("\r\n"); + const size = compressed.byteLength / 5; + for (var i = 0; i < 5; i++) { + cork = false; + await write(compressed.slice(size * i, size * (i + 1))); + } + socket.flush(); + }, + drain(socket) {}, + }, + }); + + const res = await fetch(`http://${server.hostname}:${server.port}`, { + signal: AbortSignal.timeout(1000), + }); + gcTick(false); + try { + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + while (true) { + gcTick(false); + + const { done, value } = (await reader?.read()) as ReadableStreamDefaultReadResult<any>; + if (value) { + buffer = Buffer.concat([buffer, value]); + } + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe("unreachable"); + } catch (err) { + expect((err as Error).name).toBe("TimeoutError"); + } + } finally { + server?.stop(true); + } + }); + + if (compression !== "no") { + test(`can handle corrupted ${compression} compression`, async () => { + let server: TCPSocketListener<any> | null = null; + + try { + const parts = 5; + const content = "Hello".repeat(parts); + server = Bun.listen({ + port: 0, + hostname: "0.0.0.0", + socket: { + async open(socket) { + var corked: any[] = []; + var cork = true; + async function write(chunk: any) { + await new Promise<void>((resolve, reject) => { + if (cork) { + corked.push(chunk); + } + + if (!cork && corked.length) { + socket.write(corked.join("")); + corked.length = 0; + socket.flush(); + } + + if (!cork) { + socket.write(chunk); + socket.flush(); + } + + resolve(); + }); + } + const compressed = compress(compression, Buffer.from(content, "utf8")); + await write("HTTP/1.1 200 OK\r\n"); + await write("Content-Type: text/plain\r\n"); + for (const [key, value] of Object.entries(headers)) { + await write(key + ": " + value + "\r\n"); + } + // 10 extra missing bytes that we will never sent in this case we will wait to close + await write("Content-Length: " + compressed.byteLength + "\r\n"); + await write("\r\n"); + const size = compressed.byteLength / 5; + compressed[0] = 0; // corrupt data + cork = false; + for (var i = 0; i < 5; i++) { + await write(compressed.slice(size * i, size * (i + 1))); + } + socket.flush(); + }, + drain(socket) {}, + }, + }); + + const res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + + try { + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + + while (true) { + gcTick(false); + const read_promise = reader?.read(); + const { done, value } = (await read_promise) as ReadableStreamDefaultReadResult<any>; + + if (value) { + buffer = Buffer.concat([buffer, value]); + } + + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe("unreachable"); + } catch (err) { + expect((err as Error).name).toBe("ZlibError"); + } + } finally { + server?.stop(true); + } + }); + } + + test(`can handle socket close with ${compression} compression`, async () => { + let server: TCPSocketListener<any> | null = null; + + try { + const parts = 5; + const content = "Hello".repeat(parts); + const { promise, resolve: resolveSocket } = Promise.withResolvers<Socket>(); + server = Bun.listen({ + port: 0, + hostname: "0.0.0.0", + socket: { + async open(socket) { + var corked: any[] = []; + var cork = true; + async function write(chunk: any) { + await new Promise<void>((resolve, reject) => { + if (cork) { + corked.push(chunk); + } + + if (!cork && corked.length) { + socket.write(corked.join("")); + corked.length = 0; + socket.flush(); + } + + if (!cork) { + socket.write(chunk); + socket.flush(); + } + + resolve(); + }); + } + const compressed = compress(compression, Buffer.from(content, "utf8")); + await write("HTTP/1.1 200 OK\r\n"); + await write("Content-Type: text/plain\r\n"); + for (const [key, value] of Object.entries(headers)) { + await write(key + ": " + value + "\r\n"); + } + // 10 extra missing bytes that we will never sent in this case we will wait to close + await write("Content-Length: " + compressed.byteLength + 10 + "\r\n"); + await write("\r\n"); + const size = compressed.byteLength / 5; + for (var i = 0; i < 5; i++) { + cork = false; + await write(compressed.slice(size * i, size * (i + 1))); + } + socket.flush(); + resolveSocket(socket); + }, + drain(socket) {}, + }, + }); + + const res = await fetch(`http://${server.hostname}:${server.port}`, {}); + gcTick(false); + + let socket: Socket | null = await promise; + try { + const reader = res.body?.getReader(); + + let buffer = Buffer.alloc(0); + + while (true) { + gcTick(false); + const read_promise = reader?.read(); + socket?.end(); + socket = null; + const { done, value } = (await read_promise) as ReadableStreamDefaultReadResult<any>; + + if (value) { + buffer = Buffer.concat([buffer, value]); + } + + if (done) { + break; + } + } + + gcTick(false); + expect(buffer.toString("utf8")).toBe("unreachable"); + } catch (err) { + expect((err as Error).name).toBe("ConnectionClosed"); + } + } finally { + server?.stop(true); + } + }); + } +}); diff --git a/test/js/web/fetch/fetch.test.ts b/test/js/web/fetch/fetch.test.ts index a381cb320..59847dde9 100644 --- a/test/js/web/fetch/fetch.test.ts +++ b/test/js/web/fetch/fetch.test.ts @@ -4,7 +4,7 @@ import { chmodSync, mkdtempSync, readFileSync, realpathSync, rmSync, writeFileSy import { mkfifo } from "mkfifo"; import { tmpdir } from "os"; import { join } from "path"; -import { gc, withoutAggressiveGC } from "harness"; +import { gc, withoutAggressiveGC, gcTick } from "harness"; const tmp_dir = mkdtempSync(join(realpathSync(tmpdir()), "fetch.test")); @@ -1334,11 +1334,18 @@ it("fetch() file:// works", async () => { expect(await (await fetch(new URL("fetch.test.ts", import.meta.url))).text()).toEqual( await Bun.file(Bun.fileURLToPath(new URL("fetch.test.ts", import.meta.url))).text(), ); - expect(await (await fetch(new URL("file with space in the name.txt", import.meta.url))).text()).toEqual( - await Bun.file(Bun.fileURLToPath(new URL("file with space in the name.txt", import.meta.url))).text(), - ); + gc(true); + var fileResponse = await fetch(new URL("file with space in the name.txt", import.meta.url)); + gc(true); + var fileResponseText = await fileResponse.text(); + gc(true); + var bunFile = Bun.file(Bun.fileURLToPath(new URL("file with space in the name.txt", import.meta.url))); + gc(true); + var bunFileText = await bunFile.text(); + gc(true); + expect(fileResponseText).toEqual(bunFileText); + gc(true); }); - it("cloned response headers are independent before accessing", () => { const response = new Response("hello", { headers: { diff --git a/test/js/web/fetch/fixture.png b/test/js/web/fetch/fixture.png Binary files differnew file mode 100644 index 000000000..b34dbd29d --- /dev/null +++ b/test/js/web/fetch/fixture.png diff --git a/test/js/web/fetch/fixture.png.gz b/test/js/web/fetch/fixture.png.gz Binary files differnew file mode 100644 index 000000000..8c0071811 --- /dev/null +++ b/test/js/web/fetch/fixture.png.gz |