aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar cirospaciari <ciro.spaciari@gmail.com> 2023-09-07 16:21:42 -0300
committerGravatar cirospaciari <ciro.spaciari@gmail.com> 2023-09-07 16:21:42 -0300
commit3260fdd7902d099201f6bb68fd862be56d1f0893 (patch)
treef6e45e6a3fc75fd18165589b8820e8ac61971736
parentab0cb34cf0fa0ac8d9c89af6994f7a79cd86427e (diff)
downloadbun-3260fdd7902d099201f6bb68fd862be56d1f0893.tar.gz
bun-3260fdd7902d099201f6bb68fd862be56d1f0893.tar.zst
bun-3260fdd7902d099201f6bb68fd862be56d1f0893.zip
consume multiple, only 1 task per tick
-rw-r--r--src/bun.js/webcore/response.zig33
1 files changed, 27 insertions, 6 deletions
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 4d73e6dca..2d2eb051e 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -680,6 +680,7 @@ pub const Fetch = struct {
response_buffer: MutableString = undefined,
// all shedule buffers are stored here when not streaming
scheduled_response_buffer: MutableString = undefined,
+ has_schedule_callback: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
/// response strong ref
response: JSC.Strong = .{},
/// stream strong ref if any is available
@@ -956,12 +957,7 @@ pub const Fetch = struct {
}
}
- pub fn onProgressUpdate(this: *FetchTasklet) void {
- JSC.markBinding(@src());
- log("onProgressUpdate", .{});
- var result = this.result_queue.pop() orelse return;
- defer result.deinit();
-
+ pub fn processResult(this: *FetchTasklet, result: *HTTPClientQueueResult) void {
if (this.is_waiting_body) {
return this.onBodyReceived(result);
}
@@ -1030,6 +1026,10 @@ pub const Fetch = struct {
if (this.metadata == null) {
log("onProgressUpdate: metadata is null", .{});
// cannot continue without metadata
+ if (result.body.list.items.len > 0) {
+ // looks like we have some data to save until next time
+ _ = this.scheduled_response_buffer.write(result.body.list.items) catch @panic("OOM");
+ }
return;
}
}
@@ -1067,6 +1067,22 @@ pub const Fetch = struct {
},
}
}
+ pub fn onProgressUpdate(this: *FetchTasklet) void {
+ JSC.markBinding(@src());
+ log("onProgressUpdate", .{});
+
+ var has_more = false;
+ while (this.result_queue.pop()) |result| {
+ defer result.deinit();
+ has_more = result.has_more;
+ this.processResult(result);
+ }
+
+ if (has_more) {
+ this.has_schedule_callback.store(false, .Monotonic);
+ return;
+ }
+ }
pub fn checkServerIdentity(this: *FetchTasklet, certificate_info: HTTPClient.CertificateInfo, result: *HTTPClientQueueResult) bool {
if (this.check_server_identity.get()) |check_server_identity| {
@@ -1442,6 +1458,11 @@ pub const Fetch = struct {
task.size_hint = item.getSizeHint();
task.result_queue.push(item);
task.response_buffer.reset();
+ if (task.has_schedule_callback.compareAndSwap(false, true, .Acquire, .Monotonic)) |has_schedule_callback| {
+ if (has_schedule_callback) {
+ return;
+ }
+ }
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
}
};