diff options
| author | 2023-08-23 14:05:05 -0700 | |
|---|---|---|
| committer | 2023-08-23 14:05:05 -0700 | |
| commit | c60385716b7a7ac9f788cdf7dfe37250321e0670 (patch) | |
| tree | b08cc97e7e9d456efac7ec83d4862c8a8e3043bf /src/bun.js/event_loop.zig | |
| parent | f3266ff436e0ed2aedd0d81f47a1ef104191a2c9 (diff) | |
| download | bun-c60385716b7a7ac9f788cdf7dfe37250321e0670.tar.gz bun-c60385716b7a7ac9f788cdf7dfe37250321e0670.tar.zst bun-c60385716b7a7ac9f788cdf7dfe37250321e0670.zip | |
Bunch of streams fixes (#4251)
* Update WebKit
* Don't do async hooks things when async hooks are not enabled
* Smarter scheduling of event loop tasks with the http server
* less exciting approach
* Bump WebKit
* Another approach
* Fix body-stream tests
* Fixes #1886
* Fix UAF in fetch body streaming
* Missing from commit
* Fix leak
* Fix the other leak
* Fix test
* Fix crash
* missing duperef
* Make this code clearer
* Ignore empty chunks
* Fixes #3969
* Delete flaky test
* Update bun-linux-build.yml
* Fix memory issue
* fix result body, and .done status before the last callback, dont touch headers after sent once
* refactor HTTPClientResult
* less flasky corrupted test
* oops
* fix mutex invalid state
* fix onProgressUpdate deinit/unlock
* fix onProgressUpdate deinit/unlock
* oops
* remove verbose
* fix posible null use
* avoid http null
* metadata can still be used onReject after toResponse
* dont leak task.http
* fix flask tests
* less flask close tests
---------
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Co-authored-by: cirospaciari <ciro.spaciari@gmail.com>
Diffstat (limited to 'src/bun.js/event_loop.zig')
| -rw-r--r-- | src/bun.js/event_loop.zig | 49 |
1 files changed, 47 insertions, 2 deletions
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 92874b6a4..896297060 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -509,6 +509,7 @@ comptime { } } +pub const DeferredRepeatingTask = *const (fn (*anyopaque) bool); pub const EventLoop = struct { tasks: Queue = undefined, concurrent_tasks: ConcurrentTask.Queue = ConcurrentTask.Queue{}, @@ -518,6 +519,7 @@ pub const EventLoop = struct { start_server_on_next_tick: bool = false, defer_count: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0), forever_timer: ?*uws.Timer = null, + deferred_microtask_map: std.AutoArrayHashMapUnmanaged(?*anyopaque, DeferredRepeatingTask) = .{}, pub const Queue = std.fifo.LinearFifo(Task, .Dynamic); const log = bun.Output.scoped(.EventLoop, false); @@ -528,6 +530,49 @@ pub const EventLoop = struct { } } + pub fn drainMicrotasksWithVM(this: *EventLoop, vm: *JSC.VM) void { + vm.drainMicrotasks(); + this.drainDeferredTasks(); + } + + pub fn drainMicrotasks(this: *EventLoop) void { + this.drainMicrotasksWithVM(this.global.vm()); + } + + pub fn ensureAliveForOneTick(this: *EventLoop) void { + if (this.noop_task.scheduled) return; + this.enqueueTask(Task.init(&this.noop_task)); + this.noop_task.scheduled = true; + } + + pub fn registerDeferredTask(this: *EventLoop, ctx: ?*anyopaque, task: DeferredRepeatingTask) bool { + const existing = this.deferred_microtask_map.getOrPutValue(this.virtual_machine.allocator, ctx, task) catch unreachable; + return existing.found_existing; + } + + pub fn unregisterDeferredTask(this: *EventLoop, ctx: ?*anyopaque) bool { + return this.deferred_microtask_map.swapRemove(ctx); + } + + fn drainDeferredTasks(this: *EventLoop) void { + var i: usize = 0; + var last = this.deferred_microtask_map.count(); + while (i < last) { + var key = this.deferred_microtask_map.keys()[i] orelse { + this.deferred_microtask_map.swapRemoveAt(i); + last = this.deferred_microtask_map.count(); + continue; + }; + + if (!this.deferred_microtask_map.values()[i](key)) { + this.deferred_microtask_map.swapRemoveAt(i); + last = this.deferred_microtask_map.count(); + } else { + i += 1; + } + } + } + pub fn tickWithCount(this: *EventLoop) u32 { var global = this.global; var global_vm = global.vm(); @@ -621,7 +666,7 @@ pub const EventLoop = struct { } global_vm.releaseWeakRefs(); - global_vm.drainMicrotasks(); + this.drainMicrotasksWithVM(global_vm); } this.tasks.head = if (this.tasks.count == 0) 0 else this.tasks.head; @@ -758,7 +803,7 @@ pub const EventLoop = struct { this.tickConcurrent(); } else { global_vm.releaseWeakRefs(); - global_vm.drainMicrotasks(); + this.drainMicrotasksWithVM(global_vm); this.tickConcurrent(); if (this.tasks.count > 0) continue; } |
