diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bun.js/api/bun.zig | 26 | ||||
-rw-r--r-- | src/bun.js/api/html_rewriter.zig | 7 | ||||
-rw-r--r-- | src/bun.js/api/server.zig | 61 | ||||
-rw-r--r-- | src/bun.js/bindings/bindings.cpp | 36 | ||||
-rw-r--r-- | src/bun.js/bindings/webcore/HTTPHeaderMap.cpp | 14 | ||||
-rw-r--r-- | src/bun.js/bindings/webcore/HTTPHeaderMap.h | 1 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 141 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 76 | ||||
-rw-r--r-- | src/bun_js.zig | 2 | ||||
-rw-r--r-- | src/deps/uws.zig | 8 |
10 files changed, 227 insertions, 145 deletions
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig index 7044b9258..1171d804e 100644 --- a/src/bun.js/api/bun.zig +++ b/src/bun.js/api/bun.zig @@ -2324,6 +2324,15 @@ pub const Timer = struct { .repeat = timer_id.repeat > 0, }; + // This allows us to: + // - free the memory before the job is run + // - reuse the JSC.Strong + if (!repeats) { + this.callback = .{}; + map.put(vm.allocator, timer_id.id, null) catch unreachable; + this.deinit(); + } + var job = vm.allocator.create(CallbackJob) catch @panic( "Out of memory while allocating Timeout", ); @@ -2333,15 +2342,6 @@ pub const Timer = struct { job.ref.ref(vm); vm.enqueueTask(JSC.Task.init(&job.task)); - - // This allows us to: - // - free the memory before the job is run - // - reuse the JSC.Strong - if (!repeats) { - this.callback = .{}; - map.put(vm.allocator, timer_id.id, null) catch unreachable; - this.deinit(); - } } pub fn deinit(this: *Timeout) void { @@ -2405,13 +2405,16 @@ pub const Timer = struct { .globalThis = globalThis, .timer = uws.Timer.create( vm.uws_event_loop.?, - true, Timeout.ID{ .id = id, .repeat = @as(u32, @boolToInt(repeat)), }, ), }; + + timeout.poll_ref.ref(vm); + map.put(vm.allocator, id, timeout) catch unreachable; + timeout.timer.set( Timeout.ID{ .id = id, @@ -2421,9 +2424,6 @@ pub const Timer = struct { interval, @as(i32, @boolToInt(repeat)) * interval, ); - timeout.poll_ref.ref(vm); - - map.put(vm.allocator, id, timeout) catch unreachable; } pub fn setTimeout( diff --git a/src/bun.js/api/html_rewriter.zig b/src/bun.js/api/html_rewriter.zig index aeeaef3e8..c570c4c5a 100644 --- a/src/bun.js/api/html_rewriter.zig +++ b/src/bun.js/api/html_rewriter.zig @@ -393,7 +393,7 @@ pub const HTMLRewriter = struct { rewriter: *LOLHTML.HTMLRewriter, context: LOLHTMLContext, response: *Response, - input: JSC.WebCore.Blob = undefined, + input: JSC.WebCore.AnyBlob = undefined, pub fn init(context: LOLHTMLContext, global: *JSGlobalObject, original: *Response, builder: *LOLHTML.HTMLRewriter.Builder) JSValue { var result = bun.default_allocator.create(Response) catch unreachable; var sink = bun.default_allocator.create(BufferOutputSink) catch unreachable; @@ -456,12 +456,13 @@ pub const HTMLRewriter = struct { result.status_text = bun.default_allocator.dupe(u8, original.status_text) catch unreachable; var input = original.body.value.useAsAnyBlob(); + sink.input = input; const is_pending = input.needsToReadFile(); defer if (!is_pending) input.detach(); if (is_pending) { - input.Blob.doReadFileInternal(*BufferOutputSink, sink, onFinishedLoading, global); + sink.input.Blob.doReadFileInternal(*BufferOutputSink, sink, onFinishedLoading, global); } else if (sink.runOutputSink(input.slice(), false, false)) |error_value| { return error_value; } @@ -482,7 +483,7 @@ pub const HTMLRewriter = struct { } else if (sink.response.body.value == .Locked and @ptrToInt(sink.response.body.value.Locked.task) == @ptrToInt(sink) and sink.response.body.value.Locked.promise != null) { - sink.response.body.value.Locked.callback = null; + sink.response.body.value.Locked.onReceiveValue = null; sink.response.body.value.Locked.task = null; } diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig index 716d8deec..6a4261288 100644 --- a/src/bun.js/api/server.zig +++ b/src/bun.js/api/server.zig @@ -1494,6 +1494,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } pub fn doRenderWithBody(this: *RequestContext, value: *JSC.WebCore.Body.Value) void { + value.toBlobIfPossible(); + switch (value.*) { .Error => { const err = value.Error; @@ -1536,37 +1538,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp switch (stream.ptr) { .Invalid => {}, - // fast path for Blob - .Blob => |val| { - streamLog("was Blob", .{}); - var blob = JSC.WebCore.Blob.initWithStore(val.store, this.server.globalThis); - blob.offset = val.offset; - blob.size = val.remain; - this.blob = .{ .Blob = blob }; - - val.store.ref(); - stream.detach(this.server.globalThis); - val.deinit(); - this.renderWithBlobFromBodyValue(); - return; - }, - - // fast path for File - .File => |val| { - streamLog("was File Blob", .{}); - this.blob = .{ - .Blob = JSC.WebCore.Blob.initWithStore(val.store, this.server.globalThis), - }; - val.store.ref(); - - // it should be lazy, file shouldn't have opened yet. - std.debug.assert(!val.started); - - stream.detach(this.server.globalThis); - val.deinit(); - this.renderWithBlobFromBodyValue(); - return; - }, + // toBlobIfPossible should've caught this + .Blob, .File => unreachable, .JavaScript, .Direct => { var pair = StreamPair{ .stream = stream, .this = this }; @@ -1589,7 +1562,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp return; } - byte_stream.pipe = JSC.WebCore.ByteStream.Pipe.New(@This(), onPipe).init(this); + byte_stream.pipe = JSC.WebCore.Pipe.New(@This(), onPipe).init(this); this.byte_stream = byte_stream; this.response_buf_owned = byte_stream.buffer.moveToUnmanaged(); @@ -1611,7 +1584,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp } // when there's no stream, we need to - lock.callback = doRenderWithBodyLocked; + lock.onReceiveValue = doRenderWithBodyLocked; lock.task = this; return; @@ -1972,7 +1945,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body"); } - pub fn onDrainRequestBody(this: *RequestContext) JSC.WebCore.DrainResult { + pub fn onStartStreamingRequestBody(this: *RequestContext) JSC.WebCore.DrainResult { if (this.aborted) { return JSC.WebCore.DrainResult{ .aborted = void{}, @@ -2003,7 +1976,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp }; } const max_request_body_preallocate_length = 1024 * 256; - pub fn onPull(this: *RequestContext) void { + pub fn onStartBuffering(this: *RequestContext) void { const request = JSC.JSValue.c(this.request_js_object); request.ensureStillAlive(); @@ -2013,7 +1986,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (len == 0) { if (request.as(Request)) |req| { var old = req.body; - old.Locked.callback = null; + old.Locked.onReceiveValue = null; req.body = .{ .Empty = .{} }; old.resolve(&req.body, this.server.globalThis); return; @@ -2024,7 +1997,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp if (len >= this.server.config.max_request_body_size) { if (request.as(Request)) |req| { var old = req.body; - old.Locked.callback = null; + old.Locked.onReceiveValue = null; req.body = .{ .Empty = .{} }; old.toError(error.RequestBodyTooLarge, this.server.globalThis); return; @@ -2041,7 +2014,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp // no transfer-encoding if (request.as(Request)) |req| { var old = req.body; - old.Locked.callback = null; + old.Locked.onReceiveValue = null; req.body = .{ .Empty = .{} }; old.resolve(&req.body, this.server.globalThis); return; @@ -2053,12 +2026,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp this.resp.onData(*RequestContext, onBufferedBodyChunk, this); } - pub fn onPullCallback(this: *anyopaque) void { - onPull(bun.cast(*RequestContext, this)); + pub fn onStartBufferingCallback(this: *anyopaque) void { + onStartBuffering(bun.cast(*RequestContext, this)); } - pub fn onDrainRequestBodyCallback(this: *anyopaque) JSC.WebCore.DrainResult { - return onDrainRequestBody(bun.cast(*RequestContext, this)); + pub fn onStartStreamingRequestBodyCallback(this: *anyopaque) JSC.WebCore.DrainResult { + return onStartStreamingRequestBody(bun.cast(*RequestContext, this)); } pub const Export = shim.exportFunctions(.{ @@ -2544,8 +2517,8 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type { .Locked = .{ .task = ctx, .global = this.globalThis, - .onPull = RequestContext.onPullCallback, - .onDrain = RequestContext.onDrainRequestBodyCallback, + .onStartBuffering = RequestContext.onStartBufferingCallback, + .onStartStreaming = RequestContext.onStartStreamingRequestBodyCallback, }, }; resp.onData(*RequestContext, RequestContext.onBufferedBodyChunk, ctx); diff --git a/src/bun.js/bindings/bindings.cpp b/src/bun.js/bindings/bindings.cpp index dd3cfa3ca..0ee3843f1 100644 --- a/src/bun.js/bindings/bindings.cpp +++ b/src/bun.js/bindings/bindings.cpp @@ -214,17 +214,22 @@ void WebCore__FetchHeaders__count(WebCore__FetchHeaders* headers, uint32_t* coun *count = headers->size(); *buf_len = i; } + +typedef struct ZigSliceString { + const unsigned char* ptr; + size_t len; +} ZigSliceString; + typedef struct PicoHTTPHeader { - unsigned const char* name; - size_t name_len; - unsigned const char* value; - size_t value_len; + ZigSliceString name; + ZigSliceString value; } PicoHTTPHeader; typedef struct PicoHTTPHeaders { const PicoHTTPHeader* ptr; size_t len; } PicoHTTPHeaders; + WebCore::FetchHeaders* WebCore__FetchHeaders__createFromPicoHeaders_(const void* arg1) { PicoHTTPHeaders pico_headers = *reinterpret_cast<const PicoHTTPHeaders*>(arg1); @@ -233,29 +238,36 @@ WebCore::FetchHeaders* WebCore__FetchHeaders__createFromPicoHeaders_(const void* if (pico_headers.len > 0) { HTTPHeaderMap map = HTTPHeaderMap(); - for (size_t j = 0; j < pico_headers.len; j++) { + size_t end = pico_headers.len; + + for (size_t j = 0; j < end; j++) { PicoHTTPHeader header = pico_headers.ptr[j]; - if (header.value_len == 0) + if (header.value.len == 0) continue; - StringView nameView = StringView(reinterpret_cast<const char*>(header.name), header.name_len); + StringView nameView = StringView(reinterpret_cast<const char*>(header.name.ptr), header.name.len); LChar* data = nullptr; - auto value = String::createUninitialized(header.value_len, data); - memcpy(data, header.value, header.value_len); + auto value = String::createUninitialized(header.value.len, data); + memcpy(data, header.value.ptr, header.value.len); HTTPHeaderName name; + // memory safety: the header names must be cloned if they're not statically known + // the value must also be cloned + // isolatedCopy() doesn't actually clone, it's only for threadlocal isolation if (WebCore::findHTTPHeaderName(nameView, name)) { - map.add(name, WTFMove(value)); + map.add(name, value); } else { - map.setUncommonHeader(nameView.toString().isolatedCopy(), WTFMove(value)); + // the case where we do not need to clone the name + // when the header name is already present in the list + // we don't have that information here, so map.setUncommonHeaderCloneName exists + map.setUncommonHeaderCloneName(nameView, value); } } headers->setInternalHeaders(WTFMove(map)); } - return headers; } WebCore::FetchHeaders* WebCore__FetchHeaders__createFromUWS(JSC__JSGlobalObject* arg0, void* arg1) diff --git a/src/bun.js/bindings/webcore/HTTPHeaderMap.cpp b/src/bun.js/bindings/webcore/HTTPHeaderMap.cpp index bd516cf6c..383b0a85f 100644 --- a/src/bun.js/bindings/webcore/HTTPHeaderMap.cpp +++ b/src/bun.js/bindings/webcore/HTTPHeaderMap.cpp @@ -118,6 +118,20 @@ void HTTPHeaderMap::setUncommonHeader(const String& name, const String& value) m_uncommonHeaders[index].value = value; } +void HTTPHeaderMap::setUncommonHeaderCloneName(const StringView name, const String& value) +{ + auto index = m_uncommonHeaders.findIf([&](auto& header) { + return equalIgnoringASCIICase(header.key, name); + }); + if (index == notFound) { + LChar* ptr = nullptr; + auto nameCopy = WTF::String::createUninitialized(name.length(), ptr); + memcpy(ptr, name.characters8(), name.length()); + m_uncommonHeaders.append(UncommonHeader { nameCopy, value }); + } else + m_uncommonHeaders[index].value = value; +} + void HTTPHeaderMap::add(const String& name, const String& value) { HTTPHeaderName headerName; diff --git a/src/bun.js/bindings/webcore/HTTPHeaderMap.h b/src/bun.js/bindings/webcore/HTTPHeaderMap.h index e95c9d9f0..0b4242b57 100644 --- a/src/bun.js/bindings/webcore/HTTPHeaderMap.h +++ b/src/bun.js/bindings/webcore/HTTPHeaderMap.h @@ -212,6 +212,7 @@ public: template <class Encoder> void encode(Encoder&) const; template <class Decoder> static WARN_UNUSED_RETURN bool decode(Decoder&, HTTPHeaderMap&); void setUncommonHeader(const String& name, const String& value); + void setUncommonHeaderCloneName(const StringView name, const String& value); private: WEBCORE_EXPORT String getUncommonHeader(const String& name) const; diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index b3111e81f..5d641831f 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -1169,7 +1169,7 @@ pub const Blob = struct { bun.default_allocator.destroy(this); }, .Locked => { - value.Locked.callback = thenWrap; + value.Locked.onReceiveValue = thenWrap; value.Locked.task = this; }, } @@ -1351,7 +1351,7 @@ pub const Blob = struct { }; response.body.value.Locked.task = task; - response.body.value.Locked.callback = WriteFileWaitFromLockedValueTask.thenWrap; + response.body.value.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap; return promise.asValue(ctx.ptr()).asObjectRef(); }, @@ -1380,7 +1380,7 @@ pub const Blob = struct { }; request.body.Locked.task = task; - request.body.Locked.callback = WriteFileWaitFromLockedValueTask.thenWrap; + request.body.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap; return promise.asValue(ctx.ptr()).asObjectRef(); }, @@ -4220,6 +4220,7 @@ pub const Body = struct { } else if (this.value == .InternalBlob or this.value == .InlineBlob) { try formatter.printComma(Writer, writer, enable_ansi_colors); try writer.writeAll("\n"); + try formatter.writeIndent(Writer, writer); try Blob.writeFormatForSize(this.value.size(), writer, enable_ansi_colors); } else if (this.value == .Locked) { if (this.value.Locked.readable) |stream| { @@ -4311,16 +4312,73 @@ pub const Body = struct { global: *JSGlobalObject, task: ?*anyopaque = null, + /// runs after the data is available. - callback: ?fn (ctx: *anyopaque, value: *Value) void = null, + onReceiveValue: ?fn (ctx: *anyopaque, value: *Value) void = null, + /// conditionally runs when requesting data /// used in HTTP server to ignore request bodies unless asked for it - onPull: ?fn (ctx: *anyopaque) void = null, - onDrain: ?fn (ctx: *anyopaque) JSC.WebCore.DrainResult = null, + onStartBuffering: ?fn (ctx: *anyopaque) void = null, + + onStartStreaming: ?fn (ctx: *anyopaque) JSC.WebCore.DrainResult = null, deinit: bool = false, action: Action = Action.none, + pub fn toAnyBlob(this: *PendingValue) ?AnyBlob { + if (this.promise != null) + return null; + + return this.toAnyBlobAllowPromise(); + } + + pub fn toAnyBlobAllowPromise(this: *PendingValue) ?AnyBlob { + const stream = this.readable orelse return null; + + switch (stream.ptr) { + .Blob => |blobby| { + var blob = JSC.WebCore.Blob.initWithStore(blobby.store, this.global); + blob.offset = blobby.offset; + blob.size = blobby.remain; + blob.store.?.ref(); + stream.detach(this.global); + stream.done(); + blobby.deinit(); + this.readable = null; + return AnyBlob{ .Blob = blob }; + }, + .File => |blobby| { + var blob = JSC.WebCore.Blob.initWithStore(blobby.store, this.global); + blobby.store.ref(); + + // it should be lazy, file shouldn't have opened yet. + std.debug.assert(!blobby.started); + + stream.detach(this.global); + blobby.deinit(); + stream.done(); + this.readable = null; + return AnyBlob{ .Blob = blob }; + }, + .Bytes => |bytes| { + + // If we've received the complete body by the time this function is called + // we can avoid streaming it and convert it to a Blob + if (bytes.has_received_last_chunk) { + stream.detach(this.global); + var blob: JSC.WebCore.AnyBlob = undefined; + blob.from(bytes.buffer); + bytes.parent().deinit(); + this.readable = null; + return blob; + } + + return null; + }, + else => return null, + } + } + pub fn setPromise(value: *PendingValue, globalThis: *JSC.JSGlobalObject, action: Action) JSValue { value.action = action; @@ -4358,9 +4416,9 @@ pub const Body = struct { const promise_value = promise.asValue(globalThis); value.promise = promise_value; - if (value.onPull) |onPull| { - value.onPull = null; - onPull(value.task.?); + if (value.onStartBuffering) |onStartBuffering| { + value.onStartBuffering = null; + onStartBuffering(value.task.?); } return promise_value; } @@ -4375,6 +4433,7 @@ pub const Body = struct { }; }; + /// This is a duplex stream! pub const Value = union(Tag) { Blob: Blob, /// Single-use Blob @@ -4387,6 +4446,19 @@ pub const Body = struct { Empty: void, Error: JSValue, + pub fn toBlobIfPossible(this: *Value) void { + if (this.* != .Locked) + return; + + if (this.Locked.toAnyBlob()) |blob| { + this.* = switch (blob) { + .Blob => .{ .Blob = blob.Blob }, + .InternalBlob => .{ .InternalBlob = blob.InternalBlob }, + .InlineBlob => .{ .InlineBlob = blob.InlineBlob }, + }; + } + } + pub fn size(this: *const Value) Blob.SizeType { return switch (this.*) { .Blob => this.Blob.size, @@ -4409,9 +4481,10 @@ pub const Body = struct { var _blob = InlineBlob{ .bytes = undefined, .was_string = was_string, + .len = @truncate(InlineBlob.IntSize, data.len), }; @memcpy(&_blob.bytes, data.ptr, data.len); - + allocator.free(data); return Value{ .InlineBlob = _blob, }; @@ -4469,8 +4542,8 @@ pub const Body = struct { .estimated_size = 0, }; - if (locked.onDrain) |drain| { - locked.onDrain = null; + if (locked.onStartStreaming) |drain| { + locked.onStartStreaming = null; drain_result = drain(locked.task.?); } @@ -4696,9 +4769,10 @@ pub const Body = struct { locked.readable = null; } - if (locked.callback) |callback| { - locked.callback = null; + if (locked.onReceiveValue) |callback| { + locked.onReceiveValue = null; callback(locked.task.?, new); + return; } if (locked.promise) |promise_| { @@ -4761,6 +4835,8 @@ pub const Body = struct { } pub fn use(this: *Value) Blob { + this.toBlobIfPossible(); + switch (this.*) { .Blob => { var new_blob = this.Blob; @@ -4802,25 +4878,28 @@ pub const Body = struct { } } + pub fn tryUseAsAnyBlob(this: *Value) ?AnyBlob { + const any_blob: AnyBlob = switch (this.*) { + .Blob => AnyBlob{ .Blob = this.Blob }, + .InternalBlob => AnyBlob{ .InternalBlob = this.InternalBlob }, + .InlineBlob => AnyBlob{ .InlineBlob = this.InlineBlob }, + .Locked => this.Locked.toAnyBlobAllowPromise() orelse return null, + else => return null, + }; + + this.* = .{ .Used = .{} }; + return any_blob; + } + pub fn useAsAnyBlob(this: *Value) AnyBlob { const any_blob: AnyBlob = switch (this.*) { .Blob => .{ .Blob = this.Blob }, .InternalBlob => .{ .InternalBlob = this.InternalBlob }, .InlineBlob => .{ .InlineBlob = this.InlineBlob }, + .Locked => this.Locked.toAnyBlobAllowPromise() orelse AnyBlob{ .InlineBlob = .{ .len = 0 } }, else => .{ .Blob = Blob.initEmpty(undefined) }, }; - if (this.* == .Locked) { - if (this.Locked.promise) |prom| { - prom.unprotect(); - } - - if (this.Locked.readable) |readable| { - readable.done(); - // TODO: convert the known streams back into blobs - } - } - this.* = .{ .Used = {} }; return any_blob; } @@ -4845,9 +4924,9 @@ pub const Body = struct { } this.* = .{ .Error = error_instance }; - if (locked.callback) |callback| { - locked.callback = null; - callback(locked.task.?, this); + if (locked.onReceiveValue) |onReceiveValue| { + locked.onReceiveValue = null; + onReceiveValue(locked.task.?, this); } return; } @@ -5049,7 +5128,11 @@ pub const Request = struct { } else if (this.body == .InternalBlob or this.body == .InlineBlob) { try writer.writeAll("\n"); try formatter.writeIndent(Writer, writer); - try Blob.writeFormatForSize(this.body.slice().len, writer, enable_ansi_colors); + if (this.body.size() == 0) { + try Blob.initEmpty(undefined).writeFormat(formatter, writer, enable_ansi_colors); + } else { + try Blob.writeFormatForSize(this.body.size(), writer, enable_ansi_colors); + } } else if (this.body == .Locked) { if (this.body.Locked.readable) |stream| { try writer.writeAll("\n"); diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index e39d14c93..e6f8b2378 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -737,6 +737,11 @@ pub const Sink = struct { status: Status = Status.closed, used: bool = false, + pub const pending = Sink{ + .ptr = @intToPtr(*anyopaque, 0xaaaaaaaa), + .vtable = undefined, + }; + pub const Status = enum { ready, closed, @@ -2663,6 +2668,28 @@ pub const ByteBlobLoader = struct { pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit); }; +pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void; + +pub const Pipe = struct { + ctx: ?*anyopaque = null, + onPipe: ?PipeFunction = null, + + pub fn New(comptime Type: type, comptime Function: anytype) type { + return struct { + pub fn pipe(self: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void { + Function(@ptrCast(*Type, @alignCast(@alignOf(Type), self)), stream, allocator); + } + + pub fn init(self: *Type) Pipe { + return Pipe{ + .ctx = self, + .onPipe = pipe, + }; + } + }; + } +}; + pub const ByteStream = struct { buffer: std.ArrayList(u8) = .{ .allocator = bun.default_allocator, @@ -2676,34 +2703,12 @@ pub const ByteStream = struct { }, done: bool = false, pending_buffer: []u8 = &.{}, - pending_value: ?*JSC.napi.Ref = null, + pending_value: JSC.Strong = .{}, offset: usize = 0, highWaterMark: Blob.SizeType = 0, pipe: Pipe = .{}, size_hint: Blob.SizeType = 0, - pub const Pipe = struct { - ctx: ?*anyopaque = null, - onPipe: ?PipeFunction = null, - - pub fn New(comptime Type: type, comptime Function: anytype) type { - return struct { - pub fn pipe(self: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void { - Function(@ptrCast(*Type, @alignCast(@alignOf(Type), self)), stream, allocator); - } - - pub fn init(self: *Type) Pipe { - return Pipe{ - .ctx = self, - .onPipe = pipe, - }; - } - }; - } - }; - - pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void; - pub const tag = ReadableStream.Tag.Bytes; pub fn setup(this: *ByteStream) void { @@ -2727,12 +2732,10 @@ pub const ByteStream = struct { } pub fn value(this: *@This()) JSValue { - if (this.pending_value == null) + const result = this.pending_value.get() orelse { return .zero; - - const result = this.pending_value.?.get(); - this.pending_value.?.set(.zero); - + }; + this.pending_value.clear(); return result; } @@ -2850,11 +2853,7 @@ pub const ByteStream = struct { pub fn setValue(this: *@This(), view: JSC.JSValue) void { JSC.markBinding(); - if (this.pending_value) |pending| { - pending.set(view); - } else { - this.pending_value = JSC.napi.Ref.create(this.parent().globalThis, view); - } + this.pending_value.set(this.parent().globalThis, view); } pub fn parent(this: *@This()) *Source { @@ -2921,10 +2920,7 @@ pub const ByteStream = struct { const view = this.value(); if (this.buffer.capacity > 0) this.buffer.clearAndFree(); this.done = true; - if (this.pending_value) |ref| { - this.pending_value = null; - ref.destroy(); - } + this.pending_value.deinit(); if (view != .zero) { this.pending_buffer = &.{}; @@ -2937,10 +2933,7 @@ pub const ByteStream = struct { JSC.markBinding(); if (this.buffer.capacity > 0) this.buffer.clearAndFree(); - if (this.pending_value) |ref| { - this.pending_value = null; - ref.destroy(); - } + this.pending_value.deinit(); if (!this.done) { this.done = true; @@ -3564,3 +3557,4 @@ pub fn NewReadyWatcher( // pub fn onError(this: *Streamer): anytype, // }; // } + diff --git a/src/bun_js.zig b/src/bun_js.zig index ebd0c0856..eda807188 100644 --- a/src/bun_js.zig +++ b/src/bun_js.zig @@ -149,7 +149,7 @@ pub const Run = struct { while (this.vm.eventLoop().tasks.count > 0 or this.vm.active_tasks > 0 or this.vm.uws_event_loop.?.active > 0) { this.vm.tick(); - if (this.vm.eventLoop().tickConcurrentWithCount() == 0) { + if (this.vm.uws_event_loop.?.num_polls > 0 or this.vm.uws_event_loop.?.active > 0) { this.vm.uws_event_loop.?.tick(); } } diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 4d6fa05e2..8f1adcc1d 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -261,9 +261,13 @@ pub const SocketTCP = NewSocketHandler(false); pub const SocketTLS = NewSocketHandler(true); pub const Timer = opaque { - pub fn create(loop: *Loop, falltrhough: bool, ptr: anytype) *Timer { + pub fn create(loop: *Loop, ptr: anytype) *Timer { const Type = @TypeOf(ptr); - return us_create_timer(loop, @as(i32, @boolToInt(falltrhough)), @sizeOf(Type)); + + // never fallthrough poll + // the problem is uSockets hardcodes it on the other end + // so we can never free non-fallthrough polls + return us_create_timer(loop, 0, @sizeOf(Type)); } pub fn set(this: *Timer, ptr: anytype, cb: ?fn (*Timer) callconv(.C) void, ms: i32, repeat_ms: i32) void { |