aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bun.js/api/bun.zig26
-rw-r--r--src/bun.js/api/html_rewriter.zig7
-rw-r--r--src/bun.js/api/server.zig61
-rw-r--r--src/bun.js/bindings/bindings.cpp36
-rw-r--r--src/bun.js/bindings/webcore/HTTPHeaderMap.cpp14
-rw-r--r--src/bun.js/bindings/webcore/HTTPHeaderMap.h1
-rw-r--r--src/bun.js/webcore/response.zig141
-rw-r--r--src/bun.js/webcore/streams.zig76
-rw-r--r--src/bun_js.zig2
-rw-r--r--src/deps/uws.zig8
10 files changed, 227 insertions, 145 deletions
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig
index 7044b9258..1171d804e 100644
--- a/src/bun.js/api/bun.zig
+++ b/src/bun.js/api/bun.zig
@@ -2324,6 +2324,15 @@ pub const Timer = struct {
.repeat = timer_id.repeat > 0,
};
+ // This allows us to:
+ // - free the memory before the job is run
+ // - reuse the JSC.Strong
+ if (!repeats) {
+ this.callback = .{};
+ map.put(vm.allocator, timer_id.id, null) catch unreachable;
+ this.deinit();
+ }
+
var job = vm.allocator.create(CallbackJob) catch @panic(
"Out of memory while allocating Timeout",
);
@@ -2333,15 +2342,6 @@ pub const Timer = struct {
job.ref.ref(vm);
vm.enqueueTask(JSC.Task.init(&job.task));
-
- // This allows us to:
- // - free the memory before the job is run
- // - reuse the JSC.Strong
- if (!repeats) {
- this.callback = .{};
- map.put(vm.allocator, timer_id.id, null) catch unreachable;
- this.deinit();
- }
}
pub fn deinit(this: *Timeout) void {
@@ -2405,13 +2405,16 @@ pub const Timer = struct {
.globalThis = globalThis,
.timer = uws.Timer.create(
vm.uws_event_loop.?,
- true,
Timeout.ID{
.id = id,
.repeat = @as(u32, @boolToInt(repeat)),
},
),
};
+
+ timeout.poll_ref.ref(vm);
+ map.put(vm.allocator, id, timeout) catch unreachable;
+
timeout.timer.set(
Timeout.ID{
.id = id,
@@ -2421,9 +2424,6 @@ pub const Timer = struct {
interval,
@as(i32, @boolToInt(repeat)) * interval,
);
- timeout.poll_ref.ref(vm);
-
- map.put(vm.allocator, id, timeout) catch unreachable;
}
pub fn setTimeout(
diff --git a/src/bun.js/api/html_rewriter.zig b/src/bun.js/api/html_rewriter.zig
index aeeaef3e8..c570c4c5a 100644
--- a/src/bun.js/api/html_rewriter.zig
+++ b/src/bun.js/api/html_rewriter.zig
@@ -393,7 +393,7 @@ pub const HTMLRewriter = struct {
rewriter: *LOLHTML.HTMLRewriter,
context: LOLHTMLContext,
response: *Response,
- input: JSC.WebCore.Blob = undefined,
+ input: JSC.WebCore.AnyBlob = undefined,
pub fn init(context: LOLHTMLContext, global: *JSGlobalObject, original: *Response, builder: *LOLHTML.HTMLRewriter.Builder) JSValue {
var result = bun.default_allocator.create(Response) catch unreachable;
var sink = bun.default_allocator.create(BufferOutputSink) catch unreachable;
@@ -456,12 +456,13 @@ pub const HTMLRewriter = struct {
result.status_text = bun.default_allocator.dupe(u8, original.status_text) catch unreachable;
var input = original.body.value.useAsAnyBlob();
+ sink.input = input;
const is_pending = input.needsToReadFile();
defer if (!is_pending) input.detach();
if (is_pending) {
- input.Blob.doReadFileInternal(*BufferOutputSink, sink, onFinishedLoading, global);
+ sink.input.Blob.doReadFileInternal(*BufferOutputSink, sink, onFinishedLoading, global);
} else if (sink.runOutputSink(input.slice(), false, false)) |error_value| {
return error_value;
}
@@ -482,7 +483,7 @@ pub const HTMLRewriter = struct {
} else if (sink.response.body.value == .Locked and @ptrToInt(sink.response.body.value.Locked.task) == @ptrToInt(sink) and
sink.response.body.value.Locked.promise != null)
{
- sink.response.body.value.Locked.callback = null;
+ sink.response.body.value.Locked.onReceiveValue = null;
sink.response.body.value.Locked.task = null;
}
diff --git a/src/bun.js/api/server.zig b/src/bun.js/api/server.zig
index 716d8deec..6a4261288 100644
--- a/src/bun.js/api/server.zig
+++ b/src/bun.js/api/server.zig
@@ -1494,6 +1494,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
pub fn doRenderWithBody(this: *RequestContext, value: *JSC.WebCore.Body.Value) void {
+ value.toBlobIfPossible();
+
switch (value.*) {
.Error => {
const err = value.Error;
@@ -1536,37 +1538,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
switch (stream.ptr) {
.Invalid => {},
- // fast path for Blob
- .Blob => |val| {
- streamLog("was Blob", .{});
- var blob = JSC.WebCore.Blob.initWithStore(val.store, this.server.globalThis);
- blob.offset = val.offset;
- blob.size = val.remain;
- this.blob = .{ .Blob = blob };
-
- val.store.ref();
- stream.detach(this.server.globalThis);
- val.deinit();
- this.renderWithBlobFromBodyValue();
- return;
- },
-
- // fast path for File
- .File => |val| {
- streamLog("was File Blob", .{});
- this.blob = .{
- .Blob = JSC.WebCore.Blob.initWithStore(val.store, this.server.globalThis),
- };
- val.store.ref();
-
- // it should be lazy, file shouldn't have opened yet.
- std.debug.assert(!val.started);
-
- stream.detach(this.server.globalThis);
- val.deinit();
- this.renderWithBlobFromBodyValue();
- return;
- },
+ // toBlobIfPossible should've caught this
+ .Blob, .File => unreachable,
.JavaScript, .Direct => {
var pair = StreamPair{ .stream = stream, .this = this };
@@ -1589,7 +1562,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
return;
}
- byte_stream.pipe = JSC.WebCore.ByteStream.Pipe.New(@This(), onPipe).init(this);
+ byte_stream.pipe = JSC.WebCore.Pipe.New(@This(), onPipe).init(this);
this.byte_stream = byte_stream;
this.response_buf_owned = byte_stream.buffer.moveToUnmanaged();
@@ -1611,7 +1584,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
// when there's no stream, we need to
- lock.callback = doRenderWithBodyLocked;
+ lock.onReceiveValue = doRenderWithBodyLocked;
lock.task = this;
return;
@@ -1972,7 +1945,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.request_body_buf.appendSlice(this.allocator, chunk) catch @panic("Out of memory while allocating request body");
}
- pub fn onDrainRequestBody(this: *RequestContext) JSC.WebCore.DrainResult {
+ pub fn onStartStreamingRequestBody(this: *RequestContext) JSC.WebCore.DrainResult {
if (this.aborted) {
return JSC.WebCore.DrainResult{
.aborted = void{},
@@ -2003,7 +1976,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
};
}
const max_request_body_preallocate_length = 1024 * 256;
- pub fn onPull(this: *RequestContext) void {
+ pub fn onStartBuffering(this: *RequestContext) void {
const request = JSC.JSValue.c(this.request_js_object);
request.ensureStillAlive();
@@ -2013,7 +1986,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (len == 0) {
if (request.as(Request)) |req| {
var old = req.body;
- old.Locked.callback = null;
+ old.Locked.onReceiveValue = null;
req.body = .{ .Empty = .{} };
old.resolve(&req.body, this.server.globalThis);
return;
@@ -2024,7 +1997,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
if (len >= this.server.config.max_request_body_size) {
if (request.as(Request)) |req| {
var old = req.body;
- old.Locked.callback = null;
+ old.Locked.onReceiveValue = null;
req.body = .{ .Empty = .{} };
old.toError(error.RequestBodyTooLarge, this.server.globalThis);
return;
@@ -2041,7 +2014,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// no transfer-encoding
if (request.as(Request)) |req| {
var old = req.body;
- old.Locked.callback = null;
+ old.Locked.onReceiveValue = null;
req.body = .{ .Empty = .{} };
old.resolve(&req.body, this.server.globalThis);
return;
@@ -2053,12 +2026,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.resp.onData(*RequestContext, onBufferedBodyChunk, this);
}
- pub fn onPullCallback(this: *anyopaque) void {
- onPull(bun.cast(*RequestContext, this));
+ pub fn onStartBufferingCallback(this: *anyopaque) void {
+ onStartBuffering(bun.cast(*RequestContext, this));
}
- pub fn onDrainRequestBodyCallback(this: *anyopaque) JSC.WebCore.DrainResult {
- return onDrainRequestBody(bun.cast(*RequestContext, this));
+ pub fn onStartStreamingRequestBodyCallback(this: *anyopaque) JSC.WebCore.DrainResult {
+ return onStartStreamingRequestBody(bun.cast(*RequestContext, this));
}
pub const Export = shim.exportFunctions(.{
@@ -2544,8 +2517,8 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
.Locked = .{
.task = ctx,
.global = this.globalThis,
- .onPull = RequestContext.onPullCallback,
- .onDrain = RequestContext.onDrainRequestBodyCallback,
+ .onStartBuffering = RequestContext.onStartBufferingCallback,
+ .onStartStreaming = RequestContext.onStartStreamingRequestBodyCallback,
},
};
resp.onData(*RequestContext, RequestContext.onBufferedBodyChunk, ctx);
diff --git a/src/bun.js/bindings/bindings.cpp b/src/bun.js/bindings/bindings.cpp
index dd3cfa3ca..0ee3843f1 100644
--- a/src/bun.js/bindings/bindings.cpp
+++ b/src/bun.js/bindings/bindings.cpp
@@ -214,17 +214,22 @@ void WebCore__FetchHeaders__count(WebCore__FetchHeaders* headers, uint32_t* coun
*count = headers->size();
*buf_len = i;
}
+
+typedef struct ZigSliceString {
+ const unsigned char* ptr;
+ size_t len;
+} ZigSliceString;
+
typedef struct PicoHTTPHeader {
- unsigned const char* name;
- size_t name_len;
- unsigned const char* value;
- size_t value_len;
+ ZigSliceString name;
+ ZigSliceString value;
} PicoHTTPHeader;
typedef struct PicoHTTPHeaders {
const PicoHTTPHeader* ptr;
size_t len;
} PicoHTTPHeaders;
+
WebCore::FetchHeaders* WebCore__FetchHeaders__createFromPicoHeaders_(const void* arg1)
{
PicoHTTPHeaders pico_headers = *reinterpret_cast<const PicoHTTPHeaders*>(arg1);
@@ -233,29 +238,36 @@ WebCore::FetchHeaders* WebCore__FetchHeaders__createFromPicoHeaders_(const void*
if (pico_headers.len > 0) {
HTTPHeaderMap map = HTTPHeaderMap();
- for (size_t j = 0; j < pico_headers.len; j++) {
+ size_t end = pico_headers.len;
+
+ for (size_t j = 0; j < end; j++) {
PicoHTTPHeader header = pico_headers.ptr[j];
- if (header.value_len == 0)
+ if (header.value.len == 0)
continue;
- StringView nameView = StringView(reinterpret_cast<const char*>(header.name), header.name_len);
+ StringView nameView = StringView(reinterpret_cast<const char*>(header.name.ptr), header.name.len);
LChar* data = nullptr;
- auto value = String::createUninitialized(header.value_len, data);
- memcpy(data, header.value, header.value_len);
+ auto value = String::createUninitialized(header.value.len, data);
+ memcpy(data, header.value.ptr, header.value.len);
HTTPHeaderName name;
+ // memory safety: the header names must be cloned if they're not statically known
+ // the value must also be cloned
+ // isolatedCopy() doesn't actually clone, it's only for threadlocal isolation
if (WebCore::findHTTPHeaderName(nameView, name)) {
- map.add(name, WTFMove(value));
+ map.add(name, value);
} else {
- map.setUncommonHeader(nameView.toString().isolatedCopy(), WTFMove(value));
+ // the case where we do not need to clone the name
+ // when the header name is already present in the list
+ // we don't have that information here, so map.setUncommonHeaderCloneName exists
+ map.setUncommonHeaderCloneName(nameView, value);
}
}
headers->setInternalHeaders(WTFMove(map));
}
-
return headers;
}
WebCore::FetchHeaders* WebCore__FetchHeaders__createFromUWS(JSC__JSGlobalObject* arg0, void* arg1)
diff --git a/src/bun.js/bindings/webcore/HTTPHeaderMap.cpp b/src/bun.js/bindings/webcore/HTTPHeaderMap.cpp
index bd516cf6c..383b0a85f 100644
--- a/src/bun.js/bindings/webcore/HTTPHeaderMap.cpp
+++ b/src/bun.js/bindings/webcore/HTTPHeaderMap.cpp
@@ -118,6 +118,20 @@ void HTTPHeaderMap::setUncommonHeader(const String& name, const String& value)
m_uncommonHeaders[index].value = value;
}
+void HTTPHeaderMap::setUncommonHeaderCloneName(const StringView name, const String& value)
+{
+ auto index = m_uncommonHeaders.findIf([&](auto& header) {
+ return equalIgnoringASCIICase(header.key, name);
+ });
+ if (index == notFound) {
+ LChar* ptr = nullptr;
+ auto nameCopy = WTF::String::createUninitialized(name.length(), ptr);
+ memcpy(ptr, name.characters8(), name.length());
+ m_uncommonHeaders.append(UncommonHeader { nameCopy, value });
+ } else
+ m_uncommonHeaders[index].value = value;
+}
+
void HTTPHeaderMap::add(const String& name, const String& value)
{
HTTPHeaderName headerName;
diff --git a/src/bun.js/bindings/webcore/HTTPHeaderMap.h b/src/bun.js/bindings/webcore/HTTPHeaderMap.h
index e95c9d9f0..0b4242b57 100644
--- a/src/bun.js/bindings/webcore/HTTPHeaderMap.h
+++ b/src/bun.js/bindings/webcore/HTTPHeaderMap.h
@@ -212,6 +212,7 @@ public:
template <class Encoder> void encode(Encoder&) const;
template <class Decoder> static WARN_UNUSED_RETURN bool decode(Decoder&, HTTPHeaderMap&);
void setUncommonHeader(const String& name, const String& value);
+ void setUncommonHeaderCloneName(const StringView name, const String& value);
private:
WEBCORE_EXPORT String getUncommonHeader(const String& name) const;
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index b3111e81f..5d641831f 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -1169,7 +1169,7 @@ pub const Blob = struct {
bun.default_allocator.destroy(this);
},
.Locked => {
- value.Locked.callback = thenWrap;
+ value.Locked.onReceiveValue = thenWrap;
value.Locked.task = this;
},
}
@@ -1351,7 +1351,7 @@ pub const Blob = struct {
};
response.body.value.Locked.task = task;
- response.body.value.Locked.callback = WriteFileWaitFromLockedValueTask.thenWrap;
+ response.body.value.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap;
return promise.asValue(ctx.ptr()).asObjectRef();
},
@@ -1380,7 +1380,7 @@ pub const Blob = struct {
};
request.body.Locked.task = task;
- request.body.Locked.callback = WriteFileWaitFromLockedValueTask.thenWrap;
+ request.body.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap;
return promise.asValue(ctx.ptr()).asObjectRef();
},
@@ -4220,6 +4220,7 @@ pub const Body = struct {
} else if (this.value == .InternalBlob or this.value == .InlineBlob) {
try formatter.printComma(Writer, writer, enable_ansi_colors);
try writer.writeAll("\n");
+ try formatter.writeIndent(Writer, writer);
try Blob.writeFormatForSize(this.value.size(), writer, enable_ansi_colors);
} else if (this.value == .Locked) {
if (this.value.Locked.readable) |stream| {
@@ -4311,16 +4312,73 @@ pub const Body = struct {
global: *JSGlobalObject,
task: ?*anyopaque = null,
+
/// runs after the data is available.
- callback: ?fn (ctx: *anyopaque, value: *Value) void = null,
+ onReceiveValue: ?fn (ctx: *anyopaque, value: *Value) void = null,
+
/// conditionally runs when requesting data
/// used in HTTP server to ignore request bodies unless asked for it
- onPull: ?fn (ctx: *anyopaque) void = null,
- onDrain: ?fn (ctx: *anyopaque) JSC.WebCore.DrainResult = null,
+ onStartBuffering: ?fn (ctx: *anyopaque) void = null,
+
+ onStartStreaming: ?fn (ctx: *anyopaque) JSC.WebCore.DrainResult = null,
deinit: bool = false,
action: Action = Action.none,
+ pub fn toAnyBlob(this: *PendingValue) ?AnyBlob {
+ if (this.promise != null)
+ return null;
+
+ return this.toAnyBlobAllowPromise();
+ }
+
+ pub fn toAnyBlobAllowPromise(this: *PendingValue) ?AnyBlob {
+ const stream = this.readable orelse return null;
+
+ switch (stream.ptr) {
+ .Blob => |blobby| {
+ var blob = JSC.WebCore.Blob.initWithStore(blobby.store, this.global);
+ blob.offset = blobby.offset;
+ blob.size = blobby.remain;
+ blob.store.?.ref();
+ stream.detach(this.global);
+ stream.done();
+ blobby.deinit();
+ this.readable = null;
+ return AnyBlob{ .Blob = blob };
+ },
+ .File => |blobby| {
+ var blob = JSC.WebCore.Blob.initWithStore(blobby.store, this.global);
+ blobby.store.ref();
+
+ // it should be lazy, file shouldn't have opened yet.
+ std.debug.assert(!blobby.started);
+
+ stream.detach(this.global);
+ blobby.deinit();
+ stream.done();
+ this.readable = null;
+ return AnyBlob{ .Blob = blob };
+ },
+ .Bytes => |bytes| {
+
+ // If we've received the complete body by the time this function is called
+ // we can avoid streaming it and convert it to a Blob
+ if (bytes.has_received_last_chunk) {
+ stream.detach(this.global);
+ var blob: JSC.WebCore.AnyBlob = undefined;
+ blob.from(bytes.buffer);
+ bytes.parent().deinit();
+ this.readable = null;
+ return blob;
+ }
+
+ return null;
+ },
+ else => return null,
+ }
+ }
+
pub fn setPromise(value: *PendingValue, globalThis: *JSC.JSGlobalObject, action: Action) JSValue {
value.action = action;
@@ -4358,9 +4416,9 @@ pub const Body = struct {
const promise_value = promise.asValue(globalThis);
value.promise = promise_value;
- if (value.onPull) |onPull| {
- value.onPull = null;
- onPull(value.task.?);
+ if (value.onStartBuffering) |onStartBuffering| {
+ value.onStartBuffering = null;
+ onStartBuffering(value.task.?);
}
return promise_value;
}
@@ -4375,6 +4433,7 @@ pub const Body = struct {
};
};
+ /// This is a duplex stream!
pub const Value = union(Tag) {
Blob: Blob,
/// Single-use Blob
@@ -4387,6 +4446,19 @@ pub const Body = struct {
Empty: void,
Error: JSValue,
+ pub fn toBlobIfPossible(this: *Value) void {
+ if (this.* != .Locked)
+ return;
+
+ if (this.Locked.toAnyBlob()) |blob| {
+ this.* = switch (blob) {
+ .Blob => .{ .Blob = blob.Blob },
+ .InternalBlob => .{ .InternalBlob = blob.InternalBlob },
+ .InlineBlob => .{ .InlineBlob = blob.InlineBlob },
+ };
+ }
+ }
+
pub fn size(this: *const Value) Blob.SizeType {
return switch (this.*) {
.Blob => this.Blob.size,
@@ -4409,9 +4481,10 @@ pub const Body = struct {
var _blob = InlineBlob{
.bytes = undefined,
.was_string = was_string,
+ .len = @truncate(InlineBlob.IntSize, data.len),
};
@memcpy(&_blob.bytes, data.ptr, data.len);
-
+ allocator.free(data);
return Value{
.InlineBlob = _blob,
};
@@ -4469,8 +4542,8 @@ pub const Body = struct {
.estimated_size = 0,
};
- if (locked.onDrain) |drain| {
- locked.onDrain = null;
+ if (locked.onStartStreaming) |drain| {
+ locked.onStartStreaming = null;
drain_result = drain(locked.task.?);
}
@@ -4696,9 +4769,10 @@ pub const Body = struct {
locked.readable = null;
}
- if (locked.callback) |callback| {
- locked.callback = null;
+ if (locked.onReceiveValue) |callback| {
+ locked.onReceiveValue = null;
callback(locked.task.?, new);
+ return;
}
if (locked.promise) |promise_| {
@@ -4761,6 +4835,8 @@ pub const Body = struct {
}
pub fn use(this: *Value) Blob {
+ this.toBlobIfPossible();
+
switch (this.*) {
.Blob => {
var new_blob = this.Blob;
@@ -4802,25 +4878,28 @@ pub const Body = struct {
}
}
+ pub fn tryUseAsAnyBlob(this: *Value) ?AnyBlob {
+ const any_blob: AnyBlob = switch (this.*) {
+ .Blob => AnyBlob{ .Blob = this.Blob },
+ .InternalBlob => AnyBlob{ .InternalBlob = this.InternalBlob },
+ .InlineBlob => AnyBlob{ .InlineBlob = this.InlineBlob },
+ .Locked => this.Locked.toAnyBlobAllowPromise() orelse return null,
+ else => return null,
+ };
+
+ this.* = .{ .Used = .{} };
+ return any_blob;
+ }
+
pub fn useAsAnyBlob(this: *Value) AnyBlob {
const any_blob: AnyBlob = switch (this.*) {
.Blob => .{ .Blob = this.Blob },
.InternalBlob => .{ .InternalBlob = this.InternalBlob },
.InlineBlob => .{ .InlineBlob = this.InlineBlob },
+ .Locked => this.Locked.toAnyBlobAllowPromise() orelse AnyBlob{ .InlineBlob = .{ .len = 0 } },
else => .{ .Blob = Blob.initEmpty(undefined) },
};
- if (this.* == .Locked) {
- if (this.Locked.promise) |prom| {
- prom.unprotect();
- }
-
- if (this.Locked.readable) |readable| {
- readable.done();
- // TODO: convert the known streams back into blobs
- }
- }
-
this.* = .{ .Used = {} };
return any_blob;
}
@@ -4845,9 +4924,9 @@ pub const Body = struct {
}
this.* = .{ .Error = error_instance };
- if (locked.callback) |callback| {
- locked.callback = null;
- callback(locked.task.?, this);
+ if (locked.onReceiveValue) |onReceiveValue| {
+ locked.onReceiveValue = null;
+ onReceiveValue(locked.task.?, this);
}
return;
}
@@ -5049,7 +5128,11 @@ pub const Request = struct {
} else if (this.body == .InternalBlob or this.body == .InlineBlob) {
try writer.writeAll("\n");
try formatter.writeIndent(Writer, writer);
- try Blob.writeFormatForSize(this.body.slice().len, writer, enable_ansi_colors);
+ if (this.body.size() == 0) {
+ try Blob.initEmpty(undefined).writeFormat(formatter, writer, enable_ansi_colors);
+ } else {
+ try Blob.writeFormatForSize(this.body.size(), writer, enable_ansi_colors);
+ }
} else if (this.body == .Locked) {
if (this.body.Locked.readable) |stream| {
try writer.writeAll("\n");
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index e39d14c93..e6f8b2378 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -737,6 +737,11 @@ pub const Sink = struct {
status: Status = Status.closed,
used: bool = false,
+ pub const pending = Sink{
+ .ptr = @intToPtr(*anyopaque, 0xaaaaaaaa),
+ .vtable = undefined,
+ };
+
pub const Status = enum {
ready,
closed,
@@ -2663,6 +2668,28 @@ pub const ByteBlobLoader = struct {
pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit);
};
+pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void;
+
+pub const Pipe = struct {
+ ctx: ?*anyopaque = null,
+ onPipe: ?PipeFunction = null,
+
+ pub fn New(comptime Type: type, comptime Function: anytype) type {
+ return struct {
+ pub fn pipe(self: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void {
+ Function(@ptrCast(*Type, @alignCast(@alignOf(Type), self)), stream, allocator);
+ }
+
+ pub fn init(self: *Type) Pipe {
+ return Pipe{
+ .ctx = self,
+ .onPipe = pipe,
+ };
+ }
+ };
+ }
+};
+
pub const ByteStream = struct {
buffer: std.ArrayList(u8) = .{
.allocator = bun.default_allocator,
@@ -2676,34 +2703,12 @@ pub const ByteStream = struct {
},
done: bool = false,
pending_buffer: []u8 = &.{},
- pending_value: ?*JSC.napi.Ref = null,
+ pending_value: JSC.Strong = .{},
offset: usize = 0,
highWaterMark: Blob.SizeType = 0,
pipe: Pipe = .{},
size_hint: Blob.SizeType = 0,
- pub const Pipe = struct {
- ctx: ?*anyopaque = null,
- onPipe: ?PipeFunction = null,
-
- pub fn New(comptime Type: type, comptime Function: anytype) type {
- return struct {
- pub fn pipe(self: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void {
- Function(@ptrCast(*Type, @alignCast(@alignOf(Type), self)), stream, allocator);
- }
-
- pub fn init(self: *Type) Pipe {
- return Pipe{
- .ctx = self,
- .onPipe = pipe,
- };
- }
- };
- }
- };
-
- pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void;
-
pub const tag = ReadableStream.Tag.Bytes;
pub fn setup(this: *ByteStream) void {
@@ -2727,12 +2732,10 @@ pub const ByteStream = struct {
}
pub fn value(this: *@This()) JSValue {
- if (this.pending_value == null)
+ const result = this.pending_value.get() orelse {
return .zero;
-
- const result = this.pending_value.?.get();
- this.pending_value.?.set(.zero);
-
+ };
+ this.pending_value.clear();
return result;
}
@@ -2850,11 +2853,7 @@ pub const ByteStream = struct {
pub fn setValue(this: *@This(), view: JSC.JSValue) void {
JSC.markBinding();
- if (this.pending_value) |pending| {
- pending.set(view);
- } else {
- this.pending_value = JSC.napi.Ref.create(this.parent().globalThis, view);
- }
+ this.pending_value.set(this.parent().globalThis, view);
}
pub fn parent(this: *@This()) *Source {
@@ -2921,10 +2920,7 @@ pub const ByteStream = struct {
const view = this.value();
if (this.buffer.capacity > 0) this.buffer.clearAndFree();
this.done = true;
- if (this.pending_value) |ref| {
- this.pending_value = null;
- ref.destroy();
- }
+ this.pending_value.deinit();
if (view != .zero) {
this.pending_buffer = &.{};
@@ -2937,10 +2933,7 @@ pub const ByteStream = struct {
JSC.markBinding();
if (this.buffer.capacity > 0) this.buffer.clearAndFree();
- if (this.pending_value) |ref| {
- this.pending_value = null;
- ref.destroy();
- }
+ this.pending_value.deinit();
if (!this.done) {
this.done = true;
@@ -3564,3 +3557,4 @@ pub fn NewReadyWatcher(
// pub fn onError(this: *Streamer): anytype,
// };
// }
+
diff --git a/src/bun_js.zig b/src/bun_js.zig
index ebd0c0856..eda807188 100644
--- a/src/bun_js.zig
+++ b/src/bun_js.zig
@@ -149,7 +149,7 @@ pub const Run = struct {
while (this.vm.eventLoop().tasks.count > 0 or this.vm.active_tasks > 0 or this.vm.uws_event_loop.?.active > 0) {
this.vm.tick();
- if (this.vm.eventLoop().tickConcurrentWithCount() == 0) {
+ if (this.vm.uws_event_loop.?.num_polls > 0 or this.vm.uws_event_loop.?.active > 0) {
this.vm.uws_event_loop.?.tick();
}
}
diff --git a/src/deps/uws.zig b/src/deps/uws.zig
index 4d6fa05e2..8f1adcc1d 100644
--- a/src/deps/uws.zig
+++ b/src/deps/uws.zig
@@ -261,9 +261,13 @@ pub const SocketTCP = NewSocketHandler(false);
pub const SocketTLS = NewSocketHandler(true);
pub const Timer = opaque {
- pub fn create(loop: *Loop, falltrhough: bool, ptr: anytype) *Timer {
+ pub fn create(loop: *Loop, ptr: anytype) *Timer {
const Type = @TypeOf(ptr);
- return us_create_timer(loop, @as(i32, @boolToInt(falltrhough)), @sizeOf(Type));
+
+ // never fallthrough poll
+ // the problem is uSockets hardcodes it on the other end
+ // so we can never free non-fallthrough polls
+ return us_create_timer(loop, 0, @sizeOf(Type));
}
pub fn set(this: *Timer, ptr: anytype, cb: ?fn (*Timer) callconv(.C) void, ms: i32, repeat_ms: i32) void {